Migrating posix event pollers to use new event poller interface (#30419)

* Migrating posix event pollers to use new event poller interface

* add inline attributes

* Automated change: Fix sanity tests

* remove ref/unref from iomgr engine closure ad add custom closure types internal to the pollers

* updating time util usage

* use unique_ptrs

* update comments

* Automated change: Fix sanity tests

* fix review comments

* review comments

* cleanup

* update comments

* fix

* cleanup

* update comments

* Automated change: Fix sanity tests

* fix misleading comments

* bug fixes

* fix

* fix

* revert some changes for bug fixes and allow spurious wakeups for poll based poller

* sanity

* fix

* review comments

* fix

* comment

* remove re-defined function

* fix review comments

* fix windows iocp build issue due to removed function

* change Milliseconds return type

* remove header

* regenerate projects

* fix sanity

* fix sanity

* Automated change: Fix sanity tests

* delete unused file

* build issue

* cleanup

Co-authored-by: Vignesh2208 <Vignesh2208@users.noreply.github.com>
pull/30800/head^2
Vignesh Babu 2 years ago committed by GitHub
parent e2ceb1368a
commit 029f945504
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 17
      BUILD
  2. 2
      CMakeLists.txt
  3. 2
      Makefile
  4. 5
      build_autogenerated.yaml
  5. 1
      config.m4
  6. 1
      config.w32
  7. 2
      gRPC-C++.podspec
  8. 3
      gRPC-Core.podspec
  9. 2
      grpc.gemspec
  10. 2
      grpc.gyp
  11. 2
      package.xml
  12. 197
      src/core/lib/event_engine/posix_engine/ev_epoll1_linux.cc
  13. 40
      src/core/lib/event_engine/posix_engine/ev_epoll1_linux.h
  14. 255
      src/core/lib/event_engine/posix_engine/ev_poll_posix.cc
  15. 27
      src/core/lib/event_engine/posix_engine/ev_poll_posix.h
  16. 59
      src/core/lib/event_engine/posix_engine/event_poller.h
  17. 4
      src/core/lib/event_engine/posix_engine/event_poller_posix_default.cc
  18. 4
      src/core/lib/event_engine/posix_engine/event_poller_posix_default.h
  19. 6
      src/core/lib/event_engine/posix_engine/lockfree_event.cc
  20. 2
      src/core/lib/event_engine/posix_engine/lockfree_event.h
  21. 28
      src/core/lib/event_engine/posix_engine/posix_engine_closure.h
  22. 2
      src/core/lib/event_engine/time_util.cc
  23. 4
      src/core/lib/event_engine/time_util.h
  24. 5
      src/core/lib/event_engine/utils.cc
  25. 4
      src/core/lib/event_engine/utils.h
  26. 2
      src/core/lib/event_engine/windows/iocp.cc
  27. 1
      src/python/grpcio/grpc_core_dependencies.py
  28. 2
      test/core/event_engine/posix/BUILD
  29. 270
      test/core/event_engine/posix/event_poller_posix_test.cc
  30. 12
      test/core/event_engine/posix/lock_free_event_test.cc
  31. 2
      tools/doxygen/Doxyfile.c++.internal
  32. 2
      tools/doxygen/Doxyfile.core.internal

17
BUILD

@ -2422,14 +2422,15 @@ grpc_cc_library(
"src/core/lib/event_engine/posix_engine/event_poller.h",
],
external_deps = [
"absl/functional:any_invocable",
"absl/status",
"absl/strings",
],
deps = [
"event_engine_base_hdrs",
"event_engine_poller",
"gpr_platform",
"posix_event_engine_closure",
"time",
],
)
@ -2548,6 +2549,7 @@ grpc_cc_library(
],
external_deps = [
"absl/base:core_headers",
"absl/functional:any_invocable",
"absl/memory",
"absl/status",
"absl/status:statusor",
@ -2555,15 +2557,17 @@ grpc_cc_library(
"absl/synchronization",
],
deps = [
"common_event_engine_closures",
"event_engine_base_hdrs",
"event_engine_poller",
"event_engine_time_util",
"gpr",
"gpr_codegen",
"iomgr_port",
"posix_event_engine_closure",
"posix_event_engine_event_poller",
"posix_event_engine_lockfree_event",
"posix_event_engine_wakeup_fd_posix",
"posix_event_engine_wakeup_fd_posix_default",
"time",
],
)
@ -2577,12 +2581,17 @@ grpc_cc_library(
],
external_deps = [
"absl/base:core_headers",
"absl/functional:any_invocable",
"absl/status",
"absl/status:statusor",
"absl/strings",
"absl/synchronization",
],
deps = [
"common_event_engine_closures",
"event_engine_base_hdrs",
"event_engine_poller",
"event_engine_time_util",
"gpr",
"gpr_codegen",
"iomgr_port",
@ -2737,8 +2746,8 @@ grpc_cc_library(
"event_engine_executor",
"event_engine_poller",
"event_engine_socket_notifier",
"event_engine_time_util",
"event_engine_trace",
"event_engine_utils",
"gpr",
"gpr_platform",
],

2
CMakeLists.txt generated

@ -2119,6 +2119,7 @@ add_library(grpc
src/core/lib/event_engine/slice.cc
src/core/lib/event_engine/slice_buffer.cc
src/core/lib/event_engine/thread_pool.cc
src/core/lib/event_engine/time_util.cc
src/core/lib/event_engine/trace.cc
src/core/lib/event_engine/utils.cc
src/core/lib/event_engine/windows/iocp.cc
@ -2754,6 +2755,7 @@ add_library(grpc_unsecure
src/core/lib/event_engine/slice.cc
src/core/lib/event_engine/slice_buffer.cc
src/core/lib/event_engine/thread_pool.cc
src/core/lib/event_engine/time_util.cc
src/core/lib/event_engine/trace.cc
src/core/lib/event_engine/utils.cc
src/core/lib/event_engine/windows/iocp.cc

2
Makefile generated

@ -1435,6 +1435,7 @@ LIBGRPC_SRC = \
src/core/lib/event_engine/slice.cc \
src/core/lib/event_engine/slice_buffer.cc \
src/core/lib/event_engine/thread_pool.cc \
src/core/lib/event_engine/time_util.cc \
src/core/lib/event_engine/trace.cc \
src/core/lib/event_engine/utils.cc \
src/core/lib/event_engine/windows/iocp.cc \
@ -1934,6 +1935,7 @@ LIBGRPC_UNSECURE_SRC = \
src/core/lib/event_engine/slice.cc \
src/core/lib/event_engine/slice_buffer.cc \
src/core/lib/event_engine/thread_pool.cc \
src/core/lib/event_engine/time_util.cc \
src/core/lib/event_engine/trace.cc \
src/core/lib/event_engine/utils.cc \
src/core/lib/event_engine/windows/iocp.cc \

@ -790,6 +790,7 @@ libs:
- src/core/lib/event_engine/promise.h
- src/core/lib/event_engine/socket_notifier.h
- src/core/lib/event_engine/thread_pool.h
- src/core/lib/event_engine/time_util.h
- src/core/lib/event_engine/trace.h
- src/core/lib/event_engine/utils.h
- src/core/lib/event_engine/windows/iocp.h
@ -1492,6 +1493,7 @@ libs:
- src/core/lib/event_engine/slice.cc
- src/core/lib/event_engine/slice_buffer.cc
- src/core/lib/event_engine/thread_pool.cc
- src/core/lib/event_engine/time_util.cc
- src/core/lib/event_engine/trace.cc
- src/core/lib/event_engine/utils.cc
- src/core/lib/event_engine/windows/iocp.cc
@ -2007,6 +2009,7 @@ libs:
- src/core/lib/event_engine/promise.h
- src/core/lib/event_engine/socket_notifier.h
- src/core/lib/event_engine/thread_pool.h
- src/core/lib/event_engine/time_util.h
- src/core/lib/event_engine/trace.h
- src/core/lib/event_engine/utils.h
- src/core/lib/event_engine/windows/iocp.h
@ -2349,6 +2352,7 @@ libs:
- src/core/lib/event_engine/slice.cc
- src/core/lib/event_engine/slice_buffer.cc
- src/core/lib/event_engine/thread_pool.cc
- src/core/lib/event_engine/time_util.cc
- src/core/lib/event_engine/trace.cc
- src/core/lib/event_engine/utils.cc
- src/core/lib/event_engine/windows/iocp.cc
@ -5799,6 +5803,7 @@ targets:
run: false
language: c++
headers:
- src/core/lib/event_engine/common_closures.h
- src/core/lib/event_engine/posix_engine/ev_epoll1_linux.h
- src/core/lib/event_engine/posix_engine/ev_poll_posix.h
- src/core/lib/event_engine/posix_engine/event_poller.h

1
config.m4 generated

@ -483,6 +483,7 @@ if test "$PHP_GRPC" != "no"; then
src/core/lib/event_engine/slice.cc \
src/core/lib/event_engine/slice_buffer.cc \
src/core/lib/event_engine/thread_pool.cc \
src/core/lib/event_engine/time_util.cc \
src/core/lib/event_engine/trace.cc \
src/core/lib/event_engine/utils.cc \
src/core/lib/event_engine/windows/iocp.cc \

1
config.w32 generated

@ -449,6 +449,7 @@ if (PHP_GRPC != "no") {
"src\\core\\lib\\event_engine\\slice.cc " +
"src\\core\\lib\\event_engine\\slice_buffer.cc " +
"src\\core\\lib\\event_engine\\thread_pool.cc " +
"src\\core\\lib\\event_engine\\time_util.cc " +
"src\\core\\lib\\event_engine\\trace.cc " +
"src\\core\\lib\\event_engine\\utils.cc " +
"src\\core\\lib\\event_engine\\windows\\iocp.cc " +

2
gRPC-C++.podspec generated

@ -695,6 +695,7 @@ Pod::Spec.new do |s|
'src/core/lib/event_engine/promise.h',
'src/core/lib/event_engine/socket_notifier.h',
'src/core/lib/event_engine/thread_pool.h',
'src/core/lib/event_engine/time_util.h',
'src/core/lib/event_engine/trace.h',
'src/core/lib/event_engine/utils.h',
'src/core/lib/event_engine/windows/iocp.h',
@ -1553,6 +1554,7 @@ Pod::Spec.new do |s|
'src/core/lib/event_engine/promise.h',
'src/core/lib/event_engine/socket_notifier.h',
'src/core/lib/event_engine/thread_pool.h',
'src/core/lib/event_engine/time_util.h',
'src/core/lib/event_engine/trace.h',
'src/core/lib/event_engine/utils.h',
'src/core/lib/event_engine/windows/iocp.h',

3
gRPC-Core.podspec generated

@ -1076,6 +1076,8 @@ Pod::Spec.new do |s|
'src/core/lib/event_engine/socket_notifier.h',
'src/core/lib/event_engine/thread_pool.cc',
'src/core/lib/event_engine/thread_pool.h',
'src/core/lib/event_engine/time_util.cc',
'src/core/lib/event_engine/time_util.h',
'src/core/lib/event_engine/trace.cc',
'src/core/lib/event_engine/trace.h',
'src/core/lib/event_engine/utils.cc',
@ -2176,6 +2178,7 @@ Pod::Spec.new do |s|
'src/core/lib/event_engine/promise.h',
'src/core/lib/event_engine/socket_notifier.h',
'src/core/lib/event_engine/thread_pool.h',
'src/core/lib/event_engine/time_util.h',
'src/core/lib/event_engine/trace.h',
'src/core/lib/event_engine/utils.h',
'src/core/lib/event_engine/windows/iocp.h',

2
grpc.gemspec generated

@ -989,6 +989,8 @@ Gem::Specification.new do |s|
s.files += %w( src/core/lib/event_engine/socket_notifier.h )
s.files += %w( src/core/lib/event_engine/thread_pool.cc )
s.files += %w( src/core/lib/event_engine/thread_pool.h )
s.files += %w( src/core/lib/event_engine/time_util.cc )
s.files += %w( src/core/lib/event_engine/time_util.h )
s.files += %w( src/core/lib/event_engine/trace.cc )
s.files += %w( src/core/lib/event_engine/trace.h )
s.files += %w( src/core/lib/event_engine/utils.cc )

2
grpc.gyp generated

@ -814,6 +814,7 @@
'src/core/lib/event_engine/slice.cc',
'src/core/lib/event_engine/slice_buffer.cc',
'src/core/lib/event_engine/thread_pool.cc',
'src/core/lib/event_engine/time_util.cc',
'src/core/lib/event_engine/trace.cc',
'src/core/lib/event_engine/utils.cc',
'src/core/lib/event_engine/windows/iocp.cc',
@ -1258,6 +1259,7 @@
'src/core/lib/event_engine/slice.cc',
'src/core/lib/event_engine/slice_buffer.cc',
'src/core/lib/event_engine/thread_pool.cc',
'src/core/lib/event_engine/time_util.cc',
'src/core/lib/event_engine/trace.cc',
'src/core/lib/event_engine/utils.cc',
'src/core/lib/event_engine/windows/iocp.cc',

2
package.xml generated

@ -971,6 +971,8 @@
<file baseinstalldir="/" name="src/core/lib/event_engine/socket_notifier.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/event_engine/thread_pool.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/event_engine/thread_pool.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/event_engine/time_util.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/event_engine/time_util.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/event_engine/trace.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/event_engine/trace.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/event_engine/utils.cc" role="src" />

@ -17,18 +17,20 @@
#include <stdint.h>
#include <algorithm>
#include <atomic>
#include <memory>
#include "absl/functional/any_invocable.h"
#include "absl/memory/memory.h"
#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include "absl/strings/str_cat.h"
#include <grpc/impl/codegen/gpr_types.h>
#include <grpc/event_engine/event_engine.h>
#include <grpc/support/log.h>
#include <grpc/support/sync.h>
#include <grpc/support/time.h>
#include "src/core/lib/event_engine/poller.h"
#include "src/core/lib/event_engine/time_util.h"
#include "src/core/lib/iomgr/port.h"
// This polling engine is only relevant on linux kernels supporting epoll
@ -41,17 +43,15 @@
#include <sys/socket.h>
#include <unistd.h>
#include <vector>
#include "absl/synchronization/mutex.h"
#include "src/core/lib/event_engine/common_closures.h"
#include "src/core/lib/event_engine/posix_engine/event_poller.h"
#include "src/core/lib/event_engine/posix_engine/lockfree_event.h"
#include "src/core/lib/event_engine/posix_engine/posix_engine_closure.h"
#include "src/core/lib/event_engine/posix_engine/wakeup_fd_posix.h"
#include "src/core/lib/event_engine/posix_engine/wakeup_fd_posix_default.h"
#include "src/core/lib/gprpp/fork.h"
#include "src/core/lib/gprpp/time.h"
using ::grpc_event_engine::posix_engine::LockfreeEvent;
using ::grpc_event_engine::posix_engine::WakeupFd;
@ -61,13 +61,19 @@ using ::grpc_event_engine::posix_engine::WakeupFd;
namespace grpc_event_engine {
namespace posix_engine {
using ::grpc_event_engine::experimental::AnyInvocableClosure;
using ::grpc_event_engine::experimental::EventEngine;
using ::grpc_event_engine::experimental::Poller;
using ::grpc_event_engine::posix_engine::LockfreeEvent;
using ::grpc_event_engine::posix_engine::WakeupFd;
class Epoll1EventHandle : public EventHandle {
public:
Epoll1EventHandle(int fd, Epoll1Poller* poller)
: fd_(fd),
pending_actions_(0),
list_(),
list_(this),
poller_(poller),
exec_actions_closure_([this]() { ExecutePendingActions(); }),
read_closure_(absl::make_unique<LockfreeEvent>(poller->GetScheduler())),
write_closure_(
absl::make_unique<LockfreeEvent>(poller->GetScheduler())),
@ -76,47 +82,77 @@ class Epoll1EventHandle : public EventHandle {
read_closure_->InitEvent();
write_closure_->InitEvent();
error_closure_->InitEvent();
pending_actions_ = 0;
pending_read_.store(false, std::memory_order_relaxed);
pending_write_.store(false, std::memory_order_relaxed);
pending_error_.store(false, std::memory_order_relaxed);
}
void ReInit(int fd) {
fd_ = fd;
read_closure_->InitEvent();
write_closure_->InitEvent();
error_closure_->InitEvent();
pending_read_.store(false, std::memory_order_relaxed);
pending_write_.store(false, std::memory_order_relaxed);
pending_error_.store(false, std::memory_order_relaxed);
}
Epoll1Poller* Poller() { return poller_; }
void SetPendingActions(bool pending_read, bool pending_write,
bool pending_error) {
pending_actions_ |= pending_read;
EventEngine::Closure* SetPendingActions(bool pending_read, bool pending_write,
bool pending_error) {
// Another thread may be executing ExecutePendingActions() at this point
// This is possible for instance, if one instantiation of Work(..) sets
// an fd to be readable while the next instantiation of Work(...) may
// set the fd to be writable. While the second instantiation is running,
// ExecutePendingActions() of the first instantiation may execute in
// parallel and read the pending_<***>_ variables. So we need to use
// atomics to manipulate pending_<***>_ variables.
if (pending_read) {
pending_read_.store(true, std::memory_order_release);
}
if (pending_write) {
pending_actions_ |= (1 << 2);
pending_write_.store(true, std::memory_order_release);
}
if (pending_error) {
pending_actions_ |= (1 << 3);
pending_error_.store(true, std::memory_order_release);
}
if (pending_read || pending_write || pending_error) {
return &exec_actions_closure_;
}
return nullptr;
}
int WrappedFd() override { return fd_; }
void OrphanHandle(IomgrEngineClosure* on_done, int* release_fd,
void OrphanHandle(PosixEngineClosure* on_done, int* release_fd,
absl::string_view reason) override;
void ShutdownHandle(absl::Status why) override;
void NotifyOnRead(IomgrEngineClosure* on_read) override;
void NotifyOnWrite(IomgrEngineClosure* on_write) override;
void NotifyOnError(IomgrEngineClosure* on_error) override;
void NotifyOnRead(PosixEngineClosure* on_read) override;
void NotifyOnWrite(PosixEngineClosure* on_write) override;
void NotifyOnError(PosixEngineClosure* on_error) override;
void SetReadable() override;
void SetWritable() override;
void SetHasError() override;
bool IsHandleShutdown() override;
void ExecutePendingActions() override {
if (pending_actions_ & 1UL) {
inline void ExecutePendingActions() {
// These may execute in Parallel with ShutdownHandle. Thats not an issue
// because the lockfree event implementation should be able to handle it.
if (pending_read_.exchange(false, std::memory_order_acq_rel)) {
read_closure_->SetReady();
}
if ((pending_actions_ >> 2) & 1UL) {
if (pending_write_.exchange(false, std::memory_order_acq_rel)) {
write_closure_->SetReady();
}
if ((pending_actions_ >> 3) & 1UL) {
if (pending_error_.exchange(false, std::memory_order_acq_rel)) {
error_closure_->SetReady();
}
pending_actions_ = 0;
}
absl::Mutex* mu() { return &mu_; }
LockfreeEvent* ReadClosure() { return read_closure_.get(); }
LockfreeEvent* WriteClosure() { return write_closure_.get(); }
LockfreeEvent* ErrorClosure() { return error_closure_.get(); }
Epoll1Poller::HandlesList& ForkFdListPos() { return list_; }
~Epoll1EventHandle() override = default;
private:
void HandleShutdownInternal(absl::Status why, bool releasing_fd);
@ -124,9 +160,14 @@ class Epoll1EventHandle : public EventHandle {
// required.
absl::Mutex mu_;
int fd_;
int pending_actions_;
// See Epoll1Poller::SetPendingActions for explanation on why pending_<***>_
// need to be atomic.
std::atomic<bool> pending_read_{false};
std::atomic<bool> pending_write_{false};
std::atomic<bool> pending_error_{false};
Epoll1Poller::HandlesList list_;
Epoll1Poller* poller_;
AnyInvocableClosure exec_actions_closure_;
std::unique_ptr<LockfreeEvent> read_closure_;
std::unique_ptr<LockfreeEvent> write_closure_;
std::unique_ptr<LockfreeEvent> error_closure_;
@ -206,20 +247,6 @@ void ForkPollerListRemovePoller(Epoll1Poller* poller) {
}
}
int PollDeadlineToMillisTimeout(grpc_core::Timestamp millis) {
if (millis == grpc_core::Timestamp::InfFuture()) return -1;
grpc_core::Timestamp now =
grpc_core::Timestamp::FromTimespecRoundDown(gpr_now(GPR_CLOCK_MONOTONIC));
int64_t delta = (millis - now).millis();
if (delta > INT_MAX) {
return INT_MAX;
} else if (delta < 0) {
return 0;
} else {
return static_cast<int>(delta);
}
}
bool InitEpoll1PollerLinux();
// Called by the child process's post-fork handler to close open fds,
@ -269,7 +296,7 @@ bool InitEpoll1PollerLinux() {
} // namespace
void Epoll1EventHandle::OrphanHandle(IomgrEngineClosure* on_done,
void Epoll1EventHandle::OrphanHandle(PosixEngineClosure* on_done,
int* release_fd,
absl::string_view reason) {
bool is_release_fd = (release_fd != nullptr);
@ -295,12 +322,13 @@ void Epoll1EventHandle::OrphanHandle(IomgrEngineClosure* on_done,
write_closure_->DestroyEvent();
error_closure_->DestroyEvent();
}
pending_read_.store(false, std::memory_order_release);
pending_write_.store(false, std::memory_order_release);
pending_error_.store(false, std::memory_order_release);
{
absl::MutexLock lock(&poller_->mu_);
poller_->free_epoll1_handles_list_.push_back(this);
}
if (on_done != nullptr) {
on_done->SetStatus(absl::OkStatus());
poller_->GetScheduler()->Run(on_done);
@ -376,6 +404,7 @@ EventHandle* Epoll1Poller::CreateHandle(int fd, absl::string_view /*name*/,
new_handle = reinterpret_cast<Epoll1EventHandle*>(
free_epoll1_handles_list_.front());
free_epoll1_handles_list_.pop_front();
new_handle->ReInit(fd);
}
}
ForkFdListAddHandle(new_handle);
@ -397,10 +426,13 @@ EventHandle* Epoll1Poller::CreateHandle(int fd, absl::string_view /*name*/,
// Process the epoll events found by DoEpollWait() function.
// - g_epoll_set.cursor points to the index of the first event to be processed
// - This function then processes up-to MAX_EPOLL_EVENTS_PER_ITERATION and
// updates the g_epoll_set.cursor
absl::Status Epoll1Poller::ProcessEpollEvents(
int max_epoll_events_to_handle, std::vector<EventHandle*>& pending_events) {
// - This function then processes up-to max_epoll_events_to_handle and
// updates the g_epoll_set.cursor.
// It returns true, it there was a Kick that forced invocation of this
// function. It also returns the list of closures to run to take action
// on file descriptors that became readable/writable.
bool Epoll1Poller::ProcessEpollEvents(int max_epoll_events_to_handle,
Poller::Events& pending_events) {
int64_t num_events = g_epoll_set_.num_events;
int64_t cursor = g_epoll_set_.cursor;
bool was_kicked = false;
@ -422,35 +454,37 @@ absl::Status Epoll1Poller::ProcessEpollEvents(
bool read_ev = (ev->events & (EPOLLIN | EPOLLPRI)) != 0;
bool write_ev = (ev->events & EPOLLOUT) != 0;
bool err_fallback = error && !track_err;
handle->SetPendingActions(read_ev || cancel || err_fallback,
write_ev || cancel || err_fallback,
error && !err_fallback);
pending_events.push_back(handle);
if (EventEngine::Closure* closure = handle->SetPendingActions(
read_ev || cancel || err_fallback,
write_ev || cancel || err_fallback, error && !err_fallback)) {
pending_events.push_back(closure);
}
}
}
g_epoll_set_.cursor = cursor;
return was_kicked ? absl::Status(absl::StatusCode::kInternal, "Kicked")
: absl::OkStatus();
return was_kicked;
}
// Do epoll_wait and store the events in g_epoll_set.events field. This does
// not "process" any of the events yet; that is done in ProcessEpollEvents().
// See ProcessEpollEvents() function for more details.
absl::Status Epoll1Poller::DoEpollWait(grpc_core::Timestamp deadline) {
// See ProcessEpollEvents() function for more details. It returns the number
// of events generated by epoll_wait.
int Epoll1Poller::DoEpollWait(EventEngine::Duration timeout) {
int r;
int timeout = PollDeadlineToMillisTimeout(deadline);
do {
r = epoll_wait(g_epoll_set_.epfd, g_epoll_set_.events, MAX_EPOLL_EVENTS,
timeout);
static_cast<int>(
grpc_event_engine::experimental::Milliseconds(timeout)));
} while (r < 0 && errno == EINTR);
if (r < 0) {
return absl::Status(absl::StatusCode::kInternal,
absl::StrCat("epoll_wait: ", strerror(errno)));
gpr_log(GPR_ERROR,
"(event_engine) Epoll1Poller:%p encountered epoll_wait error: %s",
this, strerror(errno));
GPR_ASSERT(false);
}
g_epoll_set_.num_events = r;
g_epoll_set_.cursor = 0;
return absl::OkStatus();
return r;
}
// Might be called multiple times
@ -469,15 +503,15 @@ bool Epoll1EventHandle::IsHandleShutdown() {
return read_closure_->IsShutdown();
}
void Epoll1EventHandle::NotifyOnRead(IomgrEngineClosure* on_read) {
void Epoll1EventHandle::NotifyOnRead(PosixEngineClosure* on_read) {
read_closure_->NotifyOn(on_read);
}
void Epoll1EventHandle::NotifyOnWrite(IomgrEngineClosure* on_write) {
void Epoll1EventHandle::NotifyOnWrite(PosixEngineClosure* on_write) {
write_closure_->NotifyOn(on_write);
}
void Epoll1EventHandle::NotifyOnError(IomgrEngineClosure* on_error) {
void Epoll1EventHandle::NotifyOnError(PosixEngineClosure* on_error) {
error_closure_->NotifyOn(on_error);
}
@ -487,24 +521,28 @@ void Epoll1EventHandle::SetWritable() { write_closure_->SetReady(); }
void Epoll1EventHandle::SetHasError() { error_closure_->SetReady(); }
absl::Status Epoll1Poller::Work(grpc_core::Timestamp deadline,
std::vector<EventHandle*>& pending_events) {
// Polls the registered Fds for events until timeout is reached or there is a
// Kick(). If there is a Kick(), it returns any previously un-processed events.
// If there are no un-processed events, it returns Poller::WorkResult::Kicked{}
Poller::WorkResult Epoll1Poller::Work(EventEngine::Duration timeout) {
Poller::Events pending_events;
if (g_epoll_set_.cursor == g_epoll_set_.num_events) {
auto status = DoEpollWait(deadline);
if (!status.ok()) {
return status;
if (DoEpollWait(timeout) == 0) {
return Poller::DeadlineExceeded{};
}
}
{
absl::MutexLock lock(&mu_);
// If was_kicked_ is true, collect all pending events in this iteration.
auto status = ProcessEpollEvents(
was_kicked_ ? INT_MAX : MAX_EPOLL_EVENTS_HANDLED_PER_ITERATION,
pending_events);
if (!status.ok()) {
if (ProcessEpollEvents(
was_kicked_ ? INT_MAX : MAX_EPOLL_EVENTS_HANDLED_PER_ITERATION,
pending_events)) {
was_kicked_ = false;
}
return status;
if (pending_events.empty()) {
return Poller::Kicked{};
}
return pending_events;
}
}
@ -534,6 +572,9 @@ Epoll1Poller* GetEpoll1Poller(Scheduler* scheduler) {
namespace grpc_event_engine {
namespace posix_engine {
using ::grpc_event_engine::experimental::EventEngine;
using ::grpc_event_engine::experimental::Poller;
Epoll1Poller::Epoll1Poller(Scheduler* /* engine */) {
GPR_ASSERT(false && "unimplemented");
}
@ -547,18 +588,16 @@ EventHandle* Epoll1Poller::CreateHandle(int /*fd*/, absl::string_view /*name*/,
GPR_ASSERT(false && "unimplemented");
}
absl::Status Epoll1Poller::ProcessEpollEvents(
int /*max_epoll_events_to_handle*/,
std::vector<EventHandle*>& /*pending_events*/) {
bool Epoll1Poller::ProcessEpollEvents(int /*max_epoll_events_to_handle*/,
Poller::Events& /*pending_events*/) {
GPR_ASSERT(false && "unimplemented");
}
absl::Status Epoll1Poller::DoEpollWait(grpc_core::Timestamp /*deadline*/) {
int Epoll1Poller::DoEpollWait(EventEngine::Duration /*timeout*/) {
GPR_ASSERT(false && "unimplemented");
}
absl::Status Epoll1Poller::Work(grpc_core::Timestamp /*deadline*/,
std::vector<EventHandle*>& /*pending_events*/) {
Poller::WorkResult Epoll1Poller::Work(EventEngine::Duration /*timeout*/) {
GPR_ASSERT(false && "unimplemented");
}

@ -18,16 +18,17 @@
#include <list>
#include <memory>
#include <vector>
#include <string>
#include "absl/base/thread_annotations.h"
#include "absl/status/status.h"
#include "absl/strings/string_view.h"
#include "absl/synchronization/mutex.h"
#include <grpc/event_engine/event_engine.h>
#include "src/core/lib/event_engine/poller.h"
#include "src/core/lib/event_engine/posix_engine/event_poller.h"
#include "src/core/lib/event_engine/posix_engine/wakeup_fd_posix.h"
#include "src/core/lib/gprpp/time.h"
#include "src/core/lib/iomgr/port.h"
#ifdef GRPC_LINUX_EPOLL
@ -42,26 +43,41 @@ namespace posix_engine {
class Epoll1EventHandle;
// Definition of epoll1 based poller.
class Epoll1Poller : public EventPoller {
class Epoll1Poller : public PosixEventPoller {
public:
explicit Epoll1Poller(Scheduler* scheduler);
EventHandle* CreateHandle(int fd, absl::string_view name,
bool track_err) override;
absl::Status Work(grpc_core::Timestamp deadline,
std::vector<EventHandle*>& pending_events) override;
Poller::WorkResult Work(
grpc_event_engine::experimental::EventEngine::Duration timeout) override;
std::string Name() override { return "epoll1"; }
void Kick() override;
Scheduler* GetScheduler() { return scheduler_; }
void Shutdown() override;
~Epoll1Poller() override;
private:
absl::Status ProcessEpollEvents(int max_epoll_events_to_handle,
std::vector<EventHandle*>& pending_events);
absl::Status DoEpollWait(grpc_core::Timestamp deadline);
struct HandlesList {
// Process the epoll events found by DoEpollWait() function.
// - g_epoll_set.cursor points to the index of the first event to be processed
// - This function then processes up-to max_epoll_events_to_handle and
// updates the g_epoll_set.cursor.
// It returns true, it there was a Kick that forced invocation of this
// function. It also returns the list of closures to run to take action
// on file descriptors that became readable/writable.
bool ProcessEpollEvents(int max_epoll_events_to_handle,
Poller::Events& pending_events);
// Do epoll_wait and store the events in g_epoll_set.events field. This does
// not "process" any of the events yet; that is done in ProcessEpollEvents().
// See ProcessEpollEvents() function for more details. It returns the number
// of events generated by epoll_wait.
int DoEpollWait(
grpc_event_engine::experimental::EventEngine::Duration timeout);
class HandlesList {
public:
explicit HandlesList(Epoll1EventHandle* handle) : handle(handle) {}
Epoll1EventHandle* handle;
Epoll1EventHandle* next;
Epoll1EventHandle* prev;
Epoll1EventHandle* next = nullptr;
Epoll1EventHandle* prev = nullptr;
};
friend class Epoll1EventHandle;
#ifdef GRPC_LINUX_EPOLL

@ -19,20 +19,22 @@
#include <stdint.h>
#include <stdlib.h>
#include <algorithm>
#include <atomic>
#include <list>
#include <memory>
#include <utility>
#include "absl/functional/any_invocable.h"
#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include "absl/strings/str_cat.h"
#include <grpc/event_engine/event_engine.h>
#include <grpc/impl/codegen/gpr_types.h>
#include <grpc/support/log.h>
#include <grpc/support/sync.h>
#include <grpc/support/time.h>
#include "src/core/lib/event_engine/poller.h"
#include "src/core/lib/event_engine/posix_engine/event_poller.h"
#include "src/core/lib/event_engine/posix_engine/posix_engine_closure.h"
#include "src/core/lib/gprpp/memory.h"
@ -47,22 +49,20 @@
#include <sys/socket.h>
#include <unistd.h>
#include <vector>
#include "absl/synchronization/mutex.h"
#include <grpc/support/alloc.h>
#include "src/core/lib/event_engine/common_closures.h"
#include "src/core/lib/event_engine/posix_engine/wakeup_fd_posix.h"
#include "src/core/lib/event_engine/posix_engine/wakeup_fd_posix_default.h"
#include "src/core/lib/event_engine/time_util.h"
#include "src/core/lib/gprpp/fork.h"
#include "src/core/lib/gprpp/global_config.h"
#include "src/core/lib/gprpp/time.h"
GPR_GLOBAL_CONFIG_DECLARE_STRING(grpc_poll_strategy);
using ::grpc_event_engine::posix_engine::WakeupFd;
static const intptr_t kClosureNotReady = 0;
static const intptr_t kClosureReady = 1;
static const int kPollinCheck = POLLIN | POLLHUP | POLLERR;
@ -71,13 +71,18 @@ static const int kPolloutCheck = POLLOUT | POLLHUP | POLLERR;
namespace grpc_event_engine {
namespace posix_engine {
using ::grpc_event_engine::experimental::AnyInvocableClosure;
using ::grpc_event_engine::experimental::EventEngine;
using ::grpc_event_engine::experimental::Poller;
using ::grpc_event_engine::posix_engine::WakeupFd;
class PollEventHandle : public EventHandle {
public:
PollEventHandle(int fd, PollPoller* poller)
: fd_(fd),
pending_actions_(0),
fork_fd_list_(),
poller_handles_list_(),
fork_fd_list_(this),
poller_handles_list_(this),
poller_(poller),
scheduler_(poller->GetScheduler()),
is_orphaned_(false),
@ -87,20 +92,29 @@ class PollEventHandle : public EventHandle {
pollhup_(false),
watch_mask_(-1),
shutdown_error_(absl::OkStatus()),
exec_actions_closure_([this]() { ExecutePendingActions(); }),
on_done_(nullptr),
read_closure_(reinterpret_cast<IomgrEngineClosure*>(kClosureNotReady)),
read_closure_(reinterpret_cast<PosixEngineClosure*>(kClosureNotReady)),
write_closure_(
reinterpret_cast<IomgrEngineClosure*>(kClosureNotReady)) {
reinterpret_cast<PosixEngineClosure*>(kClosureNotReady)) {
poller_->Ref();
absl::MutexLock lock(&poller_->mu_);
poller_->PollerHandlesListAddHandle(this);
}
PollPoller* Poller() { return poller_; }
void SetPendingActions(bool pending_read, bool pending_write) {
EventEngine::Closure* SetPendingActions(bool pending_read,
bool pending_write) {
pending_actions_ |= pending_read;
if (pending_write) {
pending_actions_ |= (1 << 2);
}
if (pending_read || pending_write) {
// The closure is going to be executed. We'll Unref this handle in
// ExecutePendingActions.
Ref();
return &exec_actions_closure_;
}
return nullptr;
}
void ForceRemoveHandleFromPoller() {
absl::MutexLock lock(&poller_->mu_);
@ -130,12 +144,12 @@ class PollEventHandle : public EventHandle {
void SetWatched(int watch_mask) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) {
watch_mask_ = watch_mask;
}
void OrphanHandle(IomgrEngineClosure* on_done, int* release_fd,
void OrphanHandle(PosixEngineClosure* on_done, int* release_fd,
absl::string_view reason) override;
void ShutdownHandle(absl::Status why) override;
void NotifyOnRead(IomgrEngineClosure* on_read) override;
void NotifyOnWrite(IomgrEngineClosure* on_write) override;
void NotifyOnError(IomgrEngineClosure* on_error) override;
void NotifyOnRead(PosixEngineClosure* on_read) override;
void NotifyOnWrite(PosixEngineClosure* on_write) override;
void NotifyOnError(PosixEngineClosure* on_error) override;
void SetReadable() override;
void SetWritable() override;
void SetHasError() override;
@ -143,7 +157,7 @@ class PollEventHandle : public EventHandle {
absl::MutexLock lock(&mu_);
return is_shutdown_;
};
void ExecutePendingActions() override {
inline void ExecutePendingActions() {
int kick = 0;
{
absl::MutexLock lock(&mu_);
@ -161,11 +175,11 @@ class PollEventHandle : public EventHandle {
}
if (kick) {
// SetReadyLocked immediately scheduled some closure. It would have set
// the closure state to NOT_READY. We need to wakeup the Work(...) thread
// to start polling on this fd. If this call is not made, it is possible
// that the poller will reach a state where all the fds under the
// poller's control are not polled for POLLIN/POLLOUT events thus leading
// to an indefinitely blocked Work(..) method.
// the closure state to NOT_READY. We need to wakeup the Work(...)
// thread to start polling on this fd. If this call is not made, it is
// possible that the poller will reach a state where all the fds under
// the poller's control are not polled for POLLIN/POLLOUT events thus
// leading to an indefinitely blocked Work(..) method.
poller_->KickExternal(false);
}
Unref();
@ -188,12 +202,12 @@ class PollEventHandle : public EventHandle {
}
uint32_t BeginPollLocked(uint32_t read_mask, uint32_t write_mask)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
bool EndPollLocked(int got_read, int got_write)
EventEngine::Closure* EndPollLocked(int got_read, int got_write)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
private:
int SetReadyLocked(IomgrEngineClosure** st);
int NotifyOnLocked(IomgrEngineClosure** st, IomgrEngineClosure* closure);
int SetReadyLocked(PosixEngineClosure** st);
int NotifyOnLocked(PosixEngineClosure** st, PosixEngineClosure* closure);
// See Epoll1Poller::ShutdownHandle for explanation on why a mutex is
// required.
absl::Mutex mu_;
@ -211,13 +225,13 @@ class PollEventHandle : public EventHandle {
bool pollhup_;
int watch_mask_;
absl::Status shutdown_error_;
IomgrEngineClosure* on_done_;
IomgrEngineClosure* read_closure_;
IomgrEngineClosure* write_closure_;
AnyInvocableClosure exec_actions_closure_;
PosixEngineClosure* on_done_;
PosixEngineClosure* read_closure_;
PosixEngineClosure* write_closure_;
};
namespace {
// Only used when GRPC_ENABLE_FORK_SUPPORT=1
std::list<PollPoller*> fork_poller_list;
@ -272,11 +286,12 @@ void ForkPollerListRemovePoller(PollPoller* poller) {
}
}
int PollDeadlineToMillisTimeout(grpc_core::Timestamp millis) {
if (millis == grpc_core::Timestamp::InfFuture()) return -1;
// Returns the number of milliseconds elapsed between now and start timestamp.
int PollElapsedTimeToMillis(grpc_core::Timestamp start) {
if (start == grpc_core::Timestamp::InfFuture()) return -1;
grpc_core::Timestamp now =
grpc_core::Timestamp::FromTimespecRoundDown(gpr_now(GPR_CLOCK_MONOTONIC));
int64_t delta = (millis - now).millis();
int64_t delta = (now - start).millis();
if (delta > INT_MAX) {
return INT_MAX;
} else if (delta < 0) {
@ -289,9 +304,9 @@ int PollDeadlineToMillisTimeout(grpc_core::Timestamp millis) {
bool InitPollPollerPosix();
// Called by the child process's post-fork handler to close open fds,
// including the global epoll fd of each poller. This allows gRPC to shutdown in
// the child process without interfering with connections or RPCs ongoing in the
// parent.
// including the global epoll fd of each poller. This allows gRPC to shutdown
// in the child process without interfering with connections or RPCs ongoing
// in the parent.
void ResetEventManagerOnFork() {
// Delete all pending Epoll1EventHandles.
gpr_mu_lock(&fork_fd_list_mu);
@ -338,10 +353,13 @@ EventHandle* PollPoller::CreateHandle(int fd, absl::string_view /*name*/,
GPR_DEBUG_ASSERT(track_err == false);
PollEventHandle* handle = new PollEventHandle(fd, this);
ForkFdListAddHandle(handle);
// We need to send a kick to the thread executing Work(..) so that it can
// add this new Fd into the list of Fds to poll.
KickExternal(false);
return handle;
}
void PollEventHandle::OrphanHandle(IomgrEngineClosure* on_done, int* release_fd,
void PollEventHandle::OrphanHandle(PosixEngineClosure* on_done, int* release_fd,
absl::string_view /* reason */) {
ForkFdListRemoveHandle(this);
ForceRemoveHandleFromPoller();
@ -358,9 +376,11 @@ void PollEventHandle::OrphanHandle(IomgrEngineClosure* on_done, int* release_fd,
if (!is_shutdown_) {
is_shutdown_ = true;
shutdown_error_ =
absl::Status(absl::StatusCode::kInternal, "FD Shutdown");
absl::Status(absl::StatusCode::kInternal, "FD Orphaned");
// signal read/write closed to OS so that future operations fail.
shutdown(fd_, SHUT_RDWR);
if (!released_) {
shutdown(fd_, SHUT_RDWR);
}
SetReadyLocked(&read_closure_);
SetReadyLocked(&write_closure_);
}
@ -378,18 +398,18 @@ void PollEventHandle::OrphanHandle(IomgrEngineClosure* on_done, int* release_fd,
Unref();
}
int PollEventHandle::NotifyOnLocked(IomgrEngineClosure** st,
IomgrEngineClosure* closure) {
int PollEventHandle::NotifyOnLocked(PosixEngineClosure** st,
PosixEngineClosure* closure) {
if (is_shutdown_ || pollhup_) {
closure->SetStatus(
absl::Status(absl::StatusCode::kInternal, "FD Shutdown"));
closure->SetStatus(shutdown_error_);
scheduler_->Run(closure);
} else if (*st == reinterpret_cast<IomgrEngineClosure*>(kClosureNotReady)) {
} else if (*st == reinterpret_cast<PosixEngineClosure*>(kClosureNotReady)) {
// not ready ==> switch to a waiting state by setting the closure
*st = closure;
} else if (*st == reinterpret_cast<IomgrEngineClosure*>(kClosureReady)) {
return 0;
} else if (*st == reinterpret_cast<PosixEngineClosure*>(kClosureReady)) {
// already ready ==> queue the closure to run immediately
*st = reinterpret_cast<IomgrEngineClosure*>(kClosureNotReady);
*st = reinterpret_cast<PosixEngineClosure*>(kClosureNotReady);
closure->SetStatus(shutdown_error_);
scheduler_->Run(closure);
return 1;
@ -404,18 +424,18 @@ int PollEventHandle::NotifyOnLocked(IomgrEngineClosure** st,
}
// returns 1 if state becomes not ready
int PollEventHandle::SetReadyLocked(IomgrEngineClosure** st) {
if (*st == reinterpret_cast<IomgrEngineClosure*>(kClosureReady)) {
int PollEventHandle::SetReadyLocked(PosixEngineClosure** st) {
if (*st == reinterpret_cast<PosixEngineClosure*>(kClosureReady)) {
// duplicate ready ==> ignore
return 0;
} else if (*st == reinterpret_cast<IomgrEngineClosure*>(kClosureNotReady)) {
} else if (*st == reinterpret_cast<PosixEngineClosure*>(kClosureNotReady)) {
// not ready, and not waiting ==> flag ready
*st = reinterpret_cast<IomgrEngineClosure*>(kClosureReady);
*st = reinterpret_cast<PosixEngineClosure*>(kClosureReady);
return 0;
} else {
// waiting ==> queue closure
IomgrEngineClosure* closure = *st;
*st = reinterpret_cast<IomgrEngineClosure*>(kClosureNotReady);
PosixEngineClosure* closure = *st;
*st = reinterpret_cast<PosixEngineClosure*>(kClosureNotReady);
closure->SetStatus(shutdown_error_);
scheduler_->Run(closure);
return 1;
@ -423,8 +443,8 @@ int PollEventHandle::SetReadyLocked(IomgrEngineClosure** st) {
}
void PollEventHandle::ShutdownHandle(absl::Status why) {
// We need to take a Ref here because SetReadyLocked may trigger execution of
// a closure which calls OrphanHandle or poller->Shutdown() prematurely.
// We need to take a Ref here because SetReadyLocked may trigger execution
// of a closure which calls OrphanHandle or poller->Shutdown() prematurely.
Ref();
{
absl::MutexLock lock(&mu_);
@ -442,9 +462,9 @@ void PollEventHandle::ShutdownHandle(absl::Status why) {
Unref();
}
void PollEventHandle::NotifyOnRead(IomgrEngineClosure* on_read) {
// We need to take a Ref here because NotifyOnLocked may trigger execution of
// a closure which calls OrphanHandle that may delete this object or call
void PollEventHandle::NotifyOnRead(PosixEngineClosure* on_read) {
// We need to take a Ref here because NotifyOnLocked may trigger execution
// of a closure which calls OrphanHandle that may delete this object or call
// poller->Shutdown() prematurely.
Ref();
{
@ -464,9 +484,9 @@ void PollEventHandle::NotifyOnRead(IomgrEngineClosure* on_read) {
Unref();
}
void PollEventHandle::NotifyOnWrite(IomgrEngineClosure* on_write) {
// We need to take a Ref here because NotifyOnLocked may trigger execution of
// a closure which calls OrphanHandle that may delete this object or call
void PollEventHandle::NotifyOnWrite(PosixEngineClosure* on_write) {
// We need to take a Ref here because NotifyOnLocked may trigger execution
// of a closure which calls OrphanHandle that may delete this object or call
// poller->Shutdown() prematurely.
Ref();
{
@ -486,7 +506,7 @@ void PollEventHandle::NotifyOnWrite(IomgrEngineClosure* on_write) {
Unref();
}
void PollEventHandle::NotifyOnError(IomgrEngineClosure* on_error) {
void PollEventHandle::NotifyOnError(PosixEngineClosure* on_error) {
on_error->SetStatus(
absl::Status(absl::StatusCode::kCancelled,
"Polling engine does not support tracking errors"));
@ -519,31 +539,35 @@ uint32_t PollEventHandle::BeginPollLocked(uint32_t read_mask,
bool read_ready = (pending_actions_ & 1UL);
bool write_ready = ((pending_actions_ >> 2) & 1UL);
Ref();
// if we are shutdown, then no need to poll this fd. Set watch_mask to 0.
// If we are shutdown, then no need to poll this fd. Set watch_mask to 0.
if (is_shutdown_) {
SetWatched(0);
return 0;
}
// if there is nobody polling for read, but we need to, then start doing so
// If there is nobody polling for read, but we need to, then start doing so.
if (read_mask && !read_ready &&
read_closure_ != reinterpret_cast<IomgrEngineClosure*>(kClosureReady)) {
read_closure_ != reinterpret_cast<PosixEngineClosure*>(kClosureReady)) {
mask |= read_mask;
}
// if there is nobody polling for write, but we need to, then start doing so
// If there is nobody polling for write, but we need to, then start doing so
if (write_mask && !write_ready &&
write_closure_ != reinterpret_cast<IomgrEngineClosure*>(kClosureReady)) {
write_closure_ != reinterpret_cast<PosixEngineClosure*>(kClosureReady)) {
mask |= write_mask;
}
SetWatched(mask);
return mask;
}
bool PollEventHandle::EndPollLocked(int got_read, int got_write) {
SetPendingActions(got_read, got_write);
EventEngine::Closure* PollEventHandle::EndPollLocked(int got_read,
int got_write) {
EventEngine::Closure* closure = nullptr;
if (is_orphaned_ && !IsWatched()) {
CloseFd();
} else if (!is_orphaned_) {
closure = SetPendingActions(got_read, got_write);
}
return !is_orphaned_ && (got_read || got_write);
return closure;
}
void PollPoller::KickExternal(bool ext) {
@ -573,7 +597,7 @@ void PollPoller::PollerHandlesListAddHandle(PollEventHandle* handle) {
void PollPoller::PollerHandlesListRemoveHandle(PollEventHandle* handle) {
if (poll_handles_list_head_ == handle) {
poll_handles_list_head_ = handle->ForkFdListPos().next;
poll_handles_list_head_ = handle->PollerHandlesListPos().next;
}
if (handle->PollerHandlesListPos().prev != nullptr) {
handle->PollerHandlesListPos().prev->PollerHandlesListPos().next =
@ -617,33 +641,28 @@ PollPoller::~PollPoller() {
GPR_ASSERT(poll_handles_list_head_ == nullptr);
}
absl::Status PollPoller::Work(grpc_core::Timestamp deadline,
std::vector<EventHandle*>& pending_events) {
absl::Status error = absl::OkStatus();
Poller::WorkResult PollPoller::Work(EventEngine::Duration timeout) {
// Avoid malloc for small number of elements.
enum { inline_elements = 96 };
struct pollfd pollfd_space[inline_elements];
bool was_kicked_ext = false;
PollEventHandle* watcher_space[inline_elements];
Poller::Events pending_events;
int timeout_ms =
static_cast<int>(grpc_event_engine::experimental::Milliseconds(timeout));
mu_.Lock();
if (std::exchange(was_kicked_, false) &&
std::exchange(was_kicked_ext_, false)) {
// External kick. Need to break out.
mu_.Unlock();
return absl::Status(absl::StatusCode::kInternal, "Kicked");
}
// Start polling, and keep doing so while we're being asked to
// re-evaluate our pollers (this allows poll() based pollers to
// ensure they don't miss wakeups).
while (error.ok() && pending_events.empty() &&
deadline > grpc_core::Timestamp::FromTimespecRoundDown(
gpr_now(GPR_CLOCK_MONOTONIC))) {
int timeout = PollDeadlineToMillisTimeout(deadline);
while (pending_events.empty() && timeout_ms >= 0) {
int r = 0;
size_t i;
nfds_t pfd_count;
struct pollfd* pfds;
PollEventHandle** watchers;
// Estimate start time for a poll iteration.
grpc_core::Timestamp start = grpc_core::Timestamp::FromTimespecRoundDown(
gpr_now(GPR_CLOCK_MONOTONIC));
if (num_poll_handles_ + 2 <= inline_elements) {
pfds = pollfd_space;
watchers = watcher_space;
@ -686,8 +705,8 @@ absl::Status PollPoller::Work(grpc_core::Timestamp deadline,
}
mu_.Unlock();
if (!use_phony_poll_ || timeout == 0) {
r = poll(pfds, pfd_count, timeout);
if (!use_phony_poll_ || timeout_ms == 0) {
r = poll(pfds, pfd_count, timeout_ms);
} else {
gpr_log(GPR_ERROR,
"Attempted a blocking poll when declared non-polling.");
@ -696,9 +715,11 @@ absl::Status PollPoller::Work(grpc_core::Timestamp deadline,
if (r <= 0) {
if (r < 0 && errno != EINTR) {
// Save the error code
error = absl::Status(absl::StatusCode::kInternal,
absl::StrCat("poll: ", strerror(errno)));
// Abort fail here.
gpr_log(GPR_ERROR,
"(event_engine) PollPoller:%p encountered poll error: %s", this,
strerror(errno));
GPR_ASSERT(false);
}
for (i = 1; i < pfd_count; i++) {
@ -709,25 +730,22 @@ absl::Status PollPoller::Work(grpc_core::Timestamp deadline,
head->SetWatched(-1);
// This fd was Watched with a watch mask > 0.
if (watch_mask > 0 && r < 0) {
// This case implies the fd was polled (since watch_mask > 0 and the
// poll returned an error. Mark the fds as both readable and
// This case implies the fd was polled (since watch_mask > 0 and
// the poll returned an error. Mark the fds as both readable and
// writable.
if (head->EndPollLocked(1, 1)) {
// Its safe to add to list of pending events because EndPollLocked
// returns a +ve number only when the handle is not orphaned.
// But an orphan might be initiated on the handle after this
// Work() method returns and before the next Work() method is
// invoked. To prevent the handle from being destroyed until the
// pending events are processed, take a Ref() of the handle. This
// Ref() will be Unref'ed in ExecutePendingActions.
head->Ref();
pending_events.push_back(head);
if (EventEngine::Closure* closure = head->EndPollLocked(1, 1)) {
// Its safe to add to list of pending events because
// EndPollLocked returns a +ve number only when the handle is
// not orphaned. But an orphan might be initiated on the handle
// after this Work() method returns and before the next Work()
// method is invoked.
pending_events.push_back(closure);
}
} else {
// In this case, (1) watch_mask > 0 && r == 0 or (2) watch_mask == 0
// and r < 0 or (3) watch_mask == 0 and r == 0.
// For case-1, no events are pending on the fd even though the fd
// was polled. For case-2 and 3, the fd was not polled
// In this case, (1) watch_mask > 0 && r == 0 or (2) watch_mask ==
// 0 and r < 0 or (3) watch_mask == 0 and r == 0. For case-1, no
// events are pending on the fd even though the fd was polled. For
// case-2 and 3, the fd was not polled
head->EndPollLocked(0, 0);
}
} else {
@ -759,17 +777,15 @@ absl::Status PollPoller::Work(grpc_core::Timestamp deadline,
head->SetPollhup(true);
}
head->SetWatched(-1);
if (head->EndPollLocked(pfds[i].revents & kPollinCheck,
pfds[i].revents & kPolloutCheck)) {
if (EventEngine::Closure* closure =
head->EndPollLocked(pfds[i].revents & kPollinCheck,
pfds[i].revents & kPolloutCheck)) {
// Its safe to add to list of pending events because EndPollLocked
// returns a +ve number only when the handle is not orphaned.
// But an orphan might be initiated on the handle after this
// Work() method returns and before the next Work() method is
// invoked. To prevent the handle from being destroyed until the
// pending events are processed, take a Ref() of the handle. This
// Ref() will be Unref'ed in ExecutePendingActions.
head->Ref();
pending_events.push_back(head);
// invoked.
pending_events.push_back(closure);
}
}
lock.Release();
@ -781,16 +797,25 @@ absl::Status PollPoller::Work(grpc_core::Timestamp deadline,
if (pfds != pollfd_space) {
gpr_free(pfds);
}
// End of poll iteration. Update how much time is remaining.
timeout_ms -= PollElapsedTimeToMillis(start);
mu_.Lock();
if (std::exchange(was_kicked_, false) &&
std::exchange(was_kicked_ext_, false)) {
// External kick. Need to break out.
error = absl::Status(absl::StatusCode::kInternal, "Kicked");
was_kicked_ext = true;
break;
}
}
mu_.Unlock();
return error;
if (pending_events.empty()) {
if (was_kicked_ext) {
return Poller::Kicked{};
}
return Poller::DeadlineExceeded{};
}
return pending_events;
}
void PollPoller::Shutdown() {
@ -814,6 +839,9 @@ PollPoller* GetPollPoller(Scheduler* scheduler, bool use_phony_poll) {
namespace grpc_event_engine {
namespace posix_engine {
using ::grpc_event_engine::experimental::EventEngine;
using ::grpc_event_engine::experimental::Poller;
PollPoller::PollPoller(Scheduler* /* engine */) {
GPR_ASSERT(false && "unimplemented");
}
@ -827,8 +855,7 @@ EventHandle* PollPoller::CreateHandle(int /*fd*/, absl::string_view /*name*/,
GPR_ASSERT(false && "unimplemented");
}
absl::Status PollPoller::Work(grpc_core::Timestamp /*deadline*/,
std::vector<EventHandle*>& /*pending_events*/) {
Poller::WorkResult PollPoller::Work(EventEngine::Duration /*timeout*/) {
GPR_ASSERT(false && "unimplemented");
}

@ -19,16 +19,17 @@
#include <atomic>
#include <memory>
#include <vector>
#include <string>
#include "absl/base/thread_annotations.h"
#include "absl/status/status.h"
#include "absl/strings/string_view.h"
#include "absl/synchronization/mutex.h"
#include <grpc/event_engine/event_engine.h>
#include "src/core/lib/event_engine/poller.h"
#include "src/core/lib/event_engine/posix_engine/event_poller.h"
#include "src/core/lib/event_engine/posix_engine/wakeup_fd_posix.h"
#include "src/core/lib/gprpp/time.h"
namespace grpc_event_engine {
namespace posix_engine {
@ -36,14 +37,15 @@ namespace posix_engine {
class PollEventHandle;
// Definition of poll based poller.
class PollPoller : public EventPoller {
class PollPoller : public PosixEventPoller {
public:
explicit PollPoller(Scheduler* scheduler);
PollPoller(Scheduler* scheduler, bool use_phony_poll);
EventHandle* CreateHandle(int fd, absl::string_view name,
bool track_err) override;
absl::Status Work(grpc_core::Timestamp deadline,
std::vector<EventHandle*>& pending_events) override;
Poller::WorkResult Work(
grpc_event_engine::experimental::EventEngine::Duration timeout) override;
std::string Name() override { return "poll"; }
void Kick() override;
Scheduler* GetScheduler() { return scheduler_; }
void Shutdown() override;
@ -62,10 +64,12 @@ class PollPoller : public EventPoller {
void PollerHandlesListRemoveHandle(PollEventHandle* handle)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
friend class PollEventHandle;
struct HandlesList {
class HandlesList {
public:
explicit HandlesList(PollEventHandle* handle) : handle(handle) {}
PollEventHandle* handle;
PollEventHandle* next;
PollEventHandle* prev;
PollEventHandle* next = nullptr;
PollEventHandle* prev = nullptr;
};
absl::Mutex mu_;
Scheduler* scheduler_;
@ -79,8 +83,9 @@ class PollPoller : public EventPoller {
};
// Return an instance of a poll based poller tied to the specified scheduler.
// It use_phony_poll is true, it implies that the poller is declared non-polling
// and any attempt to schedule a blocking poll will result in a crash failure.
// It use_phony_poll is true, it implies that the poller is declared
// non-polling and any attempt to schedule a blocking poll will result in a
// crash failure.
PollPoller* GetPollPoller(Scheduler* scheduler, bool use_phony_poll);
} // namespace posix_engine

@ -16,15 +16,16 @@
#define GRPC_CORE_LIB_EVENT_ENGINE_POSIX_ENGINE_EVENT_POLLER_H
#include <grpc/support/port_platform.h>
#include <vector>
#include <string>
#include "absl/functional/any_invocable.h"
#include "absl/status/status.h"
#include "absl/strings/string_view.h"
#include <grpc/event_engine/event_engine.h>
#include "src/core/lib/event_engine/poller.h"
#include "src/core/lib/event_engine/posix_engine/posix_engine_closure.h"
#include "src/core/lib/gprpp/time.h"
namespace grpc_event_engine {
namespace posix_engine {
@ -32,6 +33,7 @@ namespace posix_engine {
class Scheduler {
public:
virtual void Run(experimental::EventEngine::Closure* closure) = 0;
virtual void Run(absl::AnyInvocable<void()>) = 0;
virtual ~Scheduler() = default;
};
@ -41,21 +43,34 @@ class EventHandle {
// Delete the handle and optionally close the underlying file descriptor if
// release_fd != nullptr. The on_done closure is scheduled to be invoked
// after the operation is complete. After this operation, NotifyXXX and SetXXX
// operations cannot be performed on the handle.
virtual void OrphanHandle(IomgrEngineClosure* on_done, int* release_fd,
// operations cannot be performed on the handle. In general, this method
// should only be called after ShutdownHandle and after all existing NotifyXXX
// closures have run and there is no waiting NotifyXXX closure.
virtual void OrphanHandle(PosixEngineClosure* on_done, int* release_fd,
absl::string_view reason) = 0;
// Shutdown a handle. After this operation, NotifyXXX and SetXXX operations
// cannot be performed.
// Shutdown a handle. If there is an attempt to call NotifyXXX operations
// after Shutdown handle, those closures will be run immediately with the
// absl::Status provided here being passed to the callbacks enclosed within
// the PosixEngineClosure object.
virtual void ShutdownHandle(absl::Status why) = 0;
// Schedule on_read to be invoked when the underlying file descriptor
// becomes readable.
virtual void NotifyOnRead(IomgrEngineClosure* on_read) = 0;
// becomes readable. When the on_read closure is run, it may check
// if the handle is shutdown using the IsHandleShutdown method and take
// appropriate actions (for instance it should not try to invoke another
// recursive NotifyOnRead if the handle is shutdown).
virtual void NotifyOnRead(PosixEngineClosure* on_read) = 0;
// Schedule on_write to be invoked when the underlying file descriptor
// becomes writable.
virtual void NotifyOnWrite(IomgrEngineClosure* on_write) = 0;
// becomes writable. When the on_write closure is run, it may check
// if the handle is shutdown using the IsHandleShutdown method and take
// appropriate actions (for instance it should not try to invoke another
// recursive NotifyOnWrite if the handle is shutdown).
virtual void NotifyOnWrite(PosixEngineClosure* on_write) = 0;
// Schedule on_error to be invoked when the underlying file descriptor
// encounters errors.
virtual void NotifyOnError(IomgrEngineClosure* on_error) = 0;
// encounters errors. When the on_error closure is run, it may check
// if the handle is shutdown using the IsHandleShutdown method and take
// appropriate actions (for instance it should not try to invoke another
// recursive NotifyOnError if the handle is shutdown).
virtual void NotifyOnError(PosixEngineClosure* on_error) = 0;
// Force set a readable event on the underlying file descriptor.
virtual void SetReadable() = 0;
// Force set a writable event on the underlying file descriptor.
@ -64,17 +79,15 @@ class EventHandle {
virtual void SetHasError() = 0;
// Returns true if the handle has been shutdown.
virtual bool IsHandleShutdown() = 0;
// Execute any pending actions that may have been set to a handle after the
// last invocation of Work(...) function.
virtual void ExecutePendingActions() = 0;
virtual ~EventHandle() = default;
};
class EventPoller {
class PosixEventPoller : public grpc_event_engine::experimental::Poller {
public:
// Return an opaque handle to perform actions on the provided file descriptor.
virtual EventHandle* CreateHandle(int fd, absl::string_view name,
bool track_err) = 0;
virtual std::string Name() = 0;
// Shuts down and deletes the poller. It is legal to call this function
// only when no other poller method is in progress. For instance, it is
// not safe to call this method, while a thread is blocked on Work(...).
@ -84,19 +97,7 @@ class EventPoller {
// thread to return.
// 3. Call Shutdown() on the poller.
virtual void Shutdown() = 0;
// Poll all the underlying file descriptors for the specified period
// and return a vector containing a list of handles which have pending
// events. The calling thread should invoke ExecutePendingActions on each
// returned handle to take the necessary pending actions. Only one thread
// may invoke the Work function at any given point in time. The Work(...)
// method returns an absl Non-OK status if it was Kicked.
virtual absl::Status Work(grpc_core::Timestamp deadline,
std::vector<EventHandle*>& pending_events) = 0;
// Trigger the thread executing Work(..) to break out as soon as possible.
// This function is useful in tests. It may also be used to break a thread
// out of Work(...) before calling Shutdown() on the poller.
virtual void Kick() = 0;
virtual ~EventPoller() = default;
~PosixEventPoller() override = default;
};
} // namespace posix_engine

@ -36,10 +36,10 @@ bool PollStrategyMatches(absl::string_view strategy, absl::string_view want) {
} // namespace
EventPoller* GetDefaultPoller(Scheduler* scheduler) {
PosixEventPoller* GetDefaultPoller(Scheduler* scheduler) {
grpc_core::UniquePtr<char> poll_strategy =
GPR_GLOBAL_CONFIG_GET(grpc_poll_strategy);
EventPoller* poller = nullptr;
PosixEventPoller* poller = nullptr;
auto strings = absl::StrSplit(poll_strategy.get(), ',');
for (auto it = strings.begin(); it != strings.end() && poller == nullptr;
it++) {

@ -20,12 +20,12 @@
namespace grpc_event_engine {
namespace posix_engine {
class EventPoller;
class PosixEventPoller;
class Scheduler;
// Return an instance of an event poller which is tied to the specified
// scheduler.
EventPoller* GetDefaultPoller(Scheduler* scheduler);
PosixEventPoller* GetDefaultPoller(Scheduler* scheduler);
} // namespace posix_engine
} // namespace grpc_event_engine

@ -90,7 +90,7 @@ void LockfreeEvent::DestroyEvent() {
std::memory_order_relaxed));
}
void LockfreeEvent::NotifyOn(IomgrEngineClosure* closure) {
void LockfreeEvent::NotifyOn(PosixEngineClosure* closure) {
// This load needs to be an acquire load because this can be a shutdown
// error that we might need to reference. Adding acquire semantics makes
// sure that the shutdown error has been initialized properly before us
@ -199,7 +199,7 @@ bool LockfreeEvent::SetShutdown(absl::Status shutdown_error) {
if (state_.compare_exchange_strong(curr, new_state,
std::memory_order_acq_rel,
std::memory_order_relaxed)) {
auto closure = reinterpret_cast<IomgrEngineClosure*>(curr);
auto closure = reinterpret_cast<PosixEngineClosure*>(curr);
closure->SetStatus(shutdown_error);
scheduler_->Run(closure);
return true;
@ -248,7 +248,7 @@ void LockfreeEvent::SetReady() {
// Full cas: acquire pairs with this cas' release in the event of a
// spurious set_ready; release pairs with this or the acquire in
// notify_on (or set_shutdown)
auto closure = reinterpret_cast<IomgrEngineClosure*>(curr);
auto closure = reinterpret_cast<PosixEngineClosure*>(curr);
closure->SetStatus(absl::OkStatus());
scheduler_->Run(closure);
return;

@ -51,7 +51,7 @@ class LockfreeEvent {
// received, in which case the closure would be scheduled immediately.
// If the shutdown state has already been set, then \a closure is scheduled
// with the shutdown error.
void NotifyOn(IomgrEngineClosure* closure);
void NotifyOn(PosixEngineClosure* closure);
// Sets the shutdown state. If a closure had been provided by NotifyOn and has
// not yet been scheduled, it will be scheduled with \a shutdown_error.

@ -30,36 +30,42 @@ namespace posix_engine {
// argument - this is important for the tcp code to function correctly. We need
// a custom closure type because the default EventEngine::Closure type doesn't
// provide a way to pass a status when the callback is run.
class IomgrEngineClosure final
class PosixEngineClosure final
: public grpc_event_engine::experimental::EventEngine::Closure {
public:
IomgrEngineClosure() = default;
IomgrEngineClosure(absl::AnyInvocable<void(absl::Status)> cb,
PosixEngineClosure() = default;
PosixEngineClosure(absl::AnyInvocable<void(absl::Status)> cb,
bool is_permanent)
: cb_(std::move(cb)),
is_permanent_(is_permanent),
status_(absl::OkStatus()) {}
~IomgrEngineClosure() final = default;
~PosixEngineClosure() final = default;
void SetStatus(absl::Status status) { status_ = status; }
void Run() override {
cb_(std::exchange(status_, absl::OkStatus()));
// We need to read the is_permanent_ variable before executing the
// enclosed callback. This is because a permanent closure may delete this
// object within the callback itself and thus reading this variable after
// the callback execution is not safe.
if (!is_permanent_) {
cb_(std::exchange(status_, absl::OkStatus()));
delete this;
} else {
cb_(std::exchange(status_, absl::OkStatus()));
}
}
// This closure clean doesn't itself up after execution. It is expected to be
// cleaned up by the caller at the appropriate time.
static IomgrEngineClosure* ToPermanentClosure(
// This closure clean doesn't itself up after execution by default. The caller
// should take care if its lifetime.
static PosixEngineClosure* ToPermanentClosure(
absl::AnyInvocable<void(absl::Status)> cb) {
return new IomgrEngineClosure(std::move(cb), true);
return new PosixEngineClosure(std::move(cb), true);
}
// This closure clean's itself up after execution. It is expected to be
// used only in tests.
static IomgrEngineClosure* TestOnlyToClosure(
static PosixEngineClosure* TestOnlyToClosure(
absl::AnyInvocable<void(absl::Status)> cb) {
return new IomgrEngineClosure(std::move(cb), false);
return new PosixEngineClosure(std::move(cb), false);
}
private:

@ -22,7 +22,7 @@
namespace grpc_event_engine {
namespace experimental {
int64_t Milliseconds(EventEngine::Duration d) {
size_t Milliseconds(EventEngine::Duration d) {
return std::chrono::duration_cast<std::chrono::milliseconds>(d).count();
}

@ -16,7 +16,7 @@
#include <grpc/support/port_platform.h>
#include <stdint.h>
#include <stddef.h>
#include <grpc/event_engine/event_engine.h>
@ -24,7 +24,7 @@ namespace grpc_event_engine {
namespace experimental {
// Convert a duration to milliseconds
int64_t Milliseconds(EventEngine::Duration d);
size_t Milliseconds(EventEngine::Duration d);
} // namespace experimental
} // namespace grpc_event_engine

@ -18,7 +18,6 @@
#include <stdint.h>
#include <algorithm>
#include <chrono>
#include "absl/strings/str_cat.h"
@ -41,9 +40,5 @@ grpc_core::Timestamp ToTimestamp(grpc_core::Timestamp now,
grpc_core::Duration::Milliseconds(1);
}
size_t Milliseconds(EventEngine::Duration d) {
return std::chrono::duration_cast<std::chrono::milliseconds>(d).count();
}
} // namespace experimental
} // namespace grpc_event_engine

@ -16,8 +16,6 @@
#include <grpc/support/port_platform.h>
#include <stddef.h>
#include <string>
#include <grpc/event_engine/event_engine.h>
@ -32,8 +30,6 @@ std::string HandleToString(EventEngine::TaskHandle handle);
grpc_core::Timestamp ToTimestamp(grpc_core::Timestamp now,
EventEngine::Duration delta);
size_t Milliseconds(EventEngine::Duration d);
} // namespace experimental
} // namespace grpc_event_engine

@ -22,8 +22,8 @@
#include <grpc/support/alloc.h>
#include <grpc/support/log_windows.h>
#include "src/core/lib/event_engine/time_util.h"
#include "src/core/lib/event_engine/trace.h"
#include "src/core/lib/event_engine/utils.h"
#include "src/core/lib/event_engine/windows/iocp.h"
#include "src/core/lib/event_engine/windows/win_socket.h"

@ -458,6 +458,7 @@ CORE_SOURCE_FILES = [
'src/core/lib/event_engine/slice.cc',
'src/core/lib/event_engine/slice_buffer.cc',
'src/core/lib/event_engine/thread_pool.cc',
'src/core/lib/event_engine/time_util.cc',
'src/core/lib/event_engine/trace.cc',
'src/core/lib/event_engine/utils.cc',
'src/core/lib/event_engine/windows/iocp.cc',

@ -73,6 +73,8 @@ grpc_cc_test(
uses_event_engine = True,
uses_polling = True,
deps = [
"//:common_event_engine_closures",
"//:event_engine_poller",
"//:posix_event_engine",
"//:posix_event_engine_closure",
"//:posix_event_engine_event_poller",

@ -14,6 +14,13 @@
#include <ostream>
#include "absl/functional/any_invocable.h"
#include "absl/time/time.h"
#include "absl/types/variant.h"
#include "src/core/lib/event_engine/poller.h"
#include "src/core/lib/event_engine/posix_engine/wakeup_fd_pipe.h"
#include "src/core/lib/event_engine/posix_engine/wakeup_fd_posix.h"
#include "src/core/lib/iomgr/port.h"
// This test won't work except with posix sockets enabled
@ -42,19 +49,23 @@
#include <grpc/support/sync.h>
#include <grpc/support/time.h>
#include "src/core/lib/event_engine/common_closures.h"
#include "src/core/lib/event_engine/posix_engine/event_poller.h"
#include "src/core/lib/event_engine/posix_engine/event_poller_posix_default.h"
#include "src/core/lib/event_engine/posix_engine/posix_engine.h"
#include "src/core/lib/event_engine/posix_engine/posix_engine_closure.h"
#include "src/core/lib/event_engine/posix_engine/wakeup_fd_posix_default.h"
#include "src/core/lib/event_engine/promise.h"
#include "src/core/lib/gprpp/dual_ref_counted.h"
#include "src/core/lib/gprpp/global_config.h"
#include "test/core/util/port.h"
GPR_GLOBAL_CONFIG_DECLARE_STRING(grpc_poll_strategy);
using ::grpc_event_engine::posix_engine::EventPoller;
using ::grpc_event_engine::posix_engine::PosixEventPoller;
static gpr_mu g_mu;
static EventPoller* g_event_poller = nullptr;
static PosixEventPoller* g_event_poller = nullptr;
// buffer size used to send and receive data.
// 1024 is the minimal value to set TCP send and receive buffer.
@ -69,6 +80,12 @@ static EventPoller* g_event_poller = nullptr;
namespace grpc_event_engine {
namespace posix_engine {
using ::grpc_event_engine::experimental::Poller;
using ::grpc_event_engine::experimental::Promise;
using ::grpc_event_engine::experimental::SelfDeletingClosure;
using ::grpc_event_engine::posix_engine::PosixEventPoller;
using namespace std::chrono_literals;
namespace {
class TestScheduler : public Scheduler {
@ -78,6 +95,10 @@ class TestScheduler : public Scheduler {
engine_->Run(closure);
}
void Run(absl::AnyInvocable<void()> cb) override {
engine_->Run(std::move(cb));
}
private:
experimental::EventEngine* engine_;
};
@ -125,7 +146,7 @@ typedef struct {
EventHandle* em_fd; /* listening fd */
ssize_t read_bytes_total; /* total number of received bytes */
int done; /* set to 1 when a server finishes serving */
IomgrEngineClosure* listen_closure;
PosixEngineClosure* listen_closure;
} server;
void ServerInit(server* sv) {
@ -139,7 +160,7 @@ typedef struct {
server* sv; /* not owned by a single session */
EventHandle* em_fd; /* fd to read upload bytes */
char read_buf[BUF_SIZE]; /* buffer to store upload bytes */
IomgrEngineClosure* session_read_closure;
PosixEngineClosure* session_read_closure;
} session;
// Called when an upload session can be safely shutdown.
@ -185,7 +206,7 @@ void SessionReadCb(session* se, absl::Status status) {
// in the polling thread, such that polling only happens after this
// callback, and will catch read edge event if data is available again
// before notify_on_read.
se->session_read_closure = IomgrEngineClosure::TestOnlyToClosure(
se->session_read_closure = PosixEngineClosure::TestOnlyToClosure(
[se](absl::Status status) { SessionReadCb(se, status); });
se->em_fd->NotifyOnRead(se->session_read_closure);
}
@ -220,7 +241,7 @@ void ListenCb(server* sv, absl::Status status) {
reinterpret_cast<struct sockaddr*>(&ss), &slen);
} while (fd < 0 && errno == EINTR);
if (fd < 0 && errno == EAGAIN) {
sv->listen_closure = IomgrEngineClosure::TestOnlyToClosure(
sv->listen_closure = PosixEngineClosure::TestOnlyToClosure(
[sv](absl::Status status) { ListenCb(sv, status); });
listen_em_fd->NotifyOnRead(sv->listen_closure);
return;
@ -232,10 +253,10 @@ void ListenCb(server* sv, absl::Status status) {
se = static_cast<session*>(gpr_malloc(sizeof(*se)));
se->sv = sv;
se->em_fd = g_event_poller->CreateHandle(fd, "listener", false);
se->session_read_closure = IomgrEngineClosure::TestOnlyToClosure(
se->session_read_closure = PosixEngineClosure::TestOnlyToClosure(
[se](absl::Status status) { SessionReadCb(se, status); });
se->em_fd->NotifyOnRead(se->session_read_closure);
sv->listen_closure = IomgrEngineClosure::TestOnlyToClosure(
sv->listen_closure = PosixEngineClosure::TestOnlyToClosure(
[sv](absl::Status status) { ListenCb(sv, status); });
listen_em_fd->NotifyOnRead(sv->listen_closure);
}
@ -258,7 +279,7 @@ int ServerStart(server* sv) {
EXPECT_EQ(listen(fd, MAX_NUM_FD), 0);
sv->em_fd = g_event_poller->CreateHandle(fd, "server", false);
sv->listen_closure = IomgrEngineClosure::TestOnlyToClosure(
sv->listen_closure = PosixEngineClosure::TestOnlyToClosure(
[sv](absl::Status status) { ListenCb(sv, status); });
sv->em_fd->NotifyOnRead(sv->listen_closure);
return port;
@ -275,7 +296,7 @@ typedef struct {
// notify_on_write to schedule another write.
int client_write_cnt;
int done;
IomgrEngineClosure* write_closure;
PosixEngineClosure* write_closure;
} client;
void ClientInit(client* cl) {
@ -312,7 +333,7 @@ void ClientSessionWrite(client* cl, absl::Status status) {
EXPECT_EQ(errno, EAGAIN);
gpr_mu_lock(&g_mu);
if (cl->client_write_cnt < CLIENT_TOTAL_WRITE_CNT) {
cl->write_closure = IomgrEngineClosure::TestOnlyToClosure(
cl->write_closure = PosixEngineClosure::TestOnlyToClosure(
[cl](absl::Status status) { ClientSessionWrite(cl, status); });
cl->client_write_cnt++;
gpr_mu_unlock(&g_mu);
@ -351,16 +372,18 @@ void ClientStart(client* cl, int port) {
// Wait for the signal to shutdown client and server.
void WaitAndShutdown(server* sv, client* cl) {
std::vector<EventHandle*> pending_events;
Poller::WorkResult result;
gpr_mu_lock(&g_mu);
while (!sv->done || !cl->done) {
gpr_mu_unlock(&g_mu);
(void)g_event_poller->Work(grpc_core::Timestamp::InfFuture(),
pending_events);
for (auto it = pending_events.begin(); it != pending_events.end(); ++it) {
(*it)->ExecutePendingActions();
result = g_event_poller->Work(24h);
if (absl::holds_alternative<Poller::Events>(result)) {
auto pending_events = absl::get<Poller::Events>(result);
for (auto it = pending_events.begin(); it != pending_events.end(); ++it) {
(*it)->Run();
}
pending_events.clear();
}
pending_events.clear();
gpr_mu_lock(&g_mu);
}
gpr_mu_unlock(&g_mu);
@ -382,13 +405,20 @@ class EventPollerTest : public ::testing::TestWithParam<std::string> {
EXPECT_NE(scheduler_, nullptr);
GPR_GLOBAL_CONFIG_SET(grpc_poll_strategy, GetParam().c_str());
g_event_poller = GetDefaultPoller(scheduler_.get());
if (g_event_poller != nullptr) {
EXPECT_EQ(g_event_poller->Name(), GetParam());
}
}
void TearDown() override {
if (g_event_poller != nullptr) {
g_event_poller->Shutdown();
}
}
public:
TestScheduler* Scheduler() { return scheduler_.get(); }
private:
std::unique_ptr<grpc_event_engine::experimental::PosixEventEngine> engine_;
std::unique_ptr<grpc_event_engine::posix_engine::TestScheduler> scheduler_;
@ -449,9 +479,9 @@ TEST_P(EventPollerTest, TestEventPollerHandleChange) {
if (g_event_poller == nullptr) {
return;
}
IomgrEngineClosure* first_closure = IomgrEngineClosure::TestOnlyToClosure(
PosixEngineClosure* first_closure = PosixEngineClosure::TestOnlyToClosure(
[a = &a](absl::Status status) { FirstReadCallback(a, status); });
IomgrEngineClosure* second_closure = IomgrEngineClosure::TestOnlyToClosure(
PosixEngineClosure* second_closure = PosixEngineClosure::TestOnlyToClosure(
[b = &b](absl::Status status) { SecondReadCallback(b, status); });
InitChangeData(&a);
InitChangeData(&b);
@ -473,16 +503,19 @@ TEST_P(EventPollerTest, TestEventPollerHandleChange) {
// And now wait for it to run.
auto poller_work = [](FdChangeData* fdc) {
std::vector<EventHandle*> pending_events;
Poller::WorkResult result;
gpr_mu_lock(&g_mu);
while (fdc->cb_that_ran == nullptr) {
gpr_mu_unlock(&g_mu);
(void)g_event_poller->Work(grpc_core::Timestamp::InfFuture(),
pending_events);
for (auto it = pending_events.begin(); it != pending_events.end(); ++it) {
(*it)->ExecutePendingActions();
result = g_event_poller->Work(24h);
if (absl::holds_alternative<Poller::Events>(result)) {
auto pending_events = absl::get<Poller::Events>(result);
for (auto it = pending_events.begin(); it != pending_events.end();
++it) {
(*it)->Run();
}
pending_events.clear();
}
pending_events.clear();
gpr_mu_lock(&g_mu);
}
};
@ -513,7 +546,192 @@ TEST_P(EventPollerTest, TestEventPollerHandleChange) {
close(sv[1]);
}
INSTANTIATE_TEST_SUITE_P(EventPoller, EventPollerTest,
std::atomic<int> kTotalActiveWakeupFdHandles{0};
// A helper class representing one file descriptor. Its implemented using
// a WakeupFd. It registers itself with the poller and waits to be notified
// of read events. Upon receiving a read event, (1) it processes it,
// (2) registes to be notified of the next read event and (3) schedules
// generation of the next read event. The Fd orphanes itself after processing
// a specified number of read events.
class WakeupFdHandle : public grpc_core::DualRefCounted<WakeupFdHandle> {
public:
WakeupFdHandle(int num_wakeups, Scheduler* scheduler,
PosixEventPoller* poller)
: num_wakeups_(num_wakeups),
scheduler_(scheduler),
poller_(poller),
on_read_(
PosixEngineClosure::ToPermanentClosure([this](absl::Status status) {
EXPECT_TRUE(status.ok());
status = ReadPipe();
if (!status.ok()) {
// Rarely epoll1 poller may generate an EPOLLHUP - which is a
// spurious wakeup. Poll based poller may also likely generate a
// lot of spurious wakeups because of the level triggered nature
// of poll In such cases do not bother changing the number of
// wakeups received.
EXPECT_EQ(status, absl::InternalError("Spurious Wakeup"));
handle_->NotifyOnRead(on_read_);
return;
}
if (--num_wakeups_ == 0) {
// This should invoke the registered NotifyOnRead callbacks with
// the shutdown error. When those callbacks call Unref(), the
// WakeupFdHandle should call OrphanHandle in the Unref() method
// implementation.
handle_->ShutdownHandle(absl::InternalError("Shutting down"));
Unref();
} else {
handle_->NotifyOnRead(on_read_);
Ref().release();
// Schedule next wakeup to trigger the registered NotifyOnRead
// callback.
scheduler_->Run(SelfDeletingClosure::Create([this]() {
// Send next wakeup.
EXPECT_TRUE(wakeup_fd_->Wakeup().ok());
Unref();
}));
}
})) {
WeakRef().release();
++kTotalActiveWakeupFdHandles;
EXPECT_GT(num_wakeups_, 0);
EXPECT_NE(scheduler_, nullptr);
EXPECT_NE(poller_, nullptr);
wakeup_fd_ = *PipeWakeupFd::CreatePipeWakeupFd();
handle_ = poller_->CreateHandle(wakeup_fd_->ReadFd(), "test", false);
EXPECT_NE(handle_, nullptr);
handle_->NotifyOnRead(on_read_);
// Send a wakeup initially.
EXPECT_TRUE(wakeup_fd_->Wakeup().ok());
}
~WakeupFdHandle() override { delete on_read_; }
void Orphan() override {
// Once the handle has orphaned itself, decrement
// kTotalActiveWakeupFdHandles. Once all handles have orphaned themselves,
// send a Kick to the poller.
handle_->OrphanHandle(
PosixEngineClosure::TestOnlyToClosure(
[poller = poller_, wakeupfd_handle = this](absl::Status status) {
EXPECT_TRUE(status.ok());
if (--kTotalActiveWakeupFdHandles == 0) {
poller->Kick();
}
wakeupfd_handle->WeakUnref();
}),
nullptr, "");
}
private:
absl::Status ReadPipe() {
char buf[128];
ssize_t r;
int total_bytes_read = 0;
for (;;) {
r = read(wakeup_fd_->ReadFd(), buf, sizeof(buf));
if (r > 0) {
total_bytes_read += r;
continue;
}
if (r == 0) return absl::OkStatus();
switch (errno) {
case EAGAIN:
return total_bytes_read > 0 ? absl::OkStatus()
: absl::InternalError("Spurious Wakeup");
case EINTR:
continue;
default:
return absl::Status(absl::StatusCode::kInternal,
absl::StrCat("read: ", strerror(errno)));
}
}
}
int num_wakeups_;
Scheduler* scheduler_;
PosixEventPoller* poller_;
PosixEngineClosure* on_read_;
std::unique_ptr<WakeupFd> wakeup_fd_;
EventHandle* handle_;
};
// A helper class to create Fds and drive the polling for these Fds. It
// repeatedly calls the Work(..) method on the poller to get pet pending events,
// then schedules another parallel Work(..) instantiation and processes these
// pending events. This continues until all Fds have orphaned themselves.
class Worker : public grpc_core::DualRefCounted<Worker> {
public:
Worker(Scheduler* scheduler, PosixEventPoller* poller, int num_handles,
int num_wakeups_per_handle)
: scheduler_(scheduler), poller_(poller) {
handles_.reserve(num_handles);
for (int i = 0; i < num_handles; i++) {
handles_.push_back(
new WakeupFdHandle(num_wakeups_per_handle, scheduler_, poller_));
}
WeakRef().release();
}
void Orphan() override { promise.Set(true); }
void Start() {
// Start executing Work(..).
scheduler_->Run([this]() { Work(); });
}
void Wait() {
EXPECT_TRUE(promise.WaitWithTimeout(absl::Seconds(60)));
WeakUnref();
}
private:
void Work() {
auto result = g_event_poller->Work(24h);
if (absl::holds_alternative<Poller::Events>(result)) {
// Schedule next work instantiation immediately and take a Ref for
// the next instantiation.
Ref().release();
scheduler_->Run([this]() { Work(); });
// Process pending events of current Work(..) instantiation.
auto pending_events = absl::get<Poller::Events>(result);
for (auto it = pending_events.begin(); it != pending_events.end(); ++it) {
(*it)->Run();
}
pending_events.clear();
// Corresponds to the Ref taken for the current instantiation.
Unref();
} else {
// The poller got kicked. This can only happen when all the Fds have
// orphaned themselves.
EXPECT_TRUE(absl::holds_alternative<Poller::Kicked>(result));
Unref();
}
}
Scheduler* scheduler_;
PosixEventPoller* poller_;
Promise<bool> promise;
std::vector<WakeupFdHandle*> handles_;
};
// This test creates kNumHandles file descriptors and kNumWakeupsPerHandle
// separate read events to the created Fds. The Fds use the NotifyOnRead API to
// wait for a read event, upon receiving a read event they process it
// immediately and schedule the wait for the next read event. A new read event
// is also generated for each fd in parallel after the previous one is
// processed.
TEST_P(EventPollerTest, TestMultipleHandles) {
static constexpr int kNumHandles = 100;
static constexpr int kNumWakeupsPerHandle = 100;
if (g_event_poller == nullptr) {
return;
}
Worker* worker = new Worker(Scheduler(), g_event_poller, kNumHandles,
kNumWakeupsPerHandle);
worker->Start();
worker->Wait();
}
INSTANTIATE_TEST_SUITE_P(PosixEventPoller, EventPollerTest,
::testing::ValuesIn({std::string("epoll1"),
std::string("poll")}),
&TestScenarioName);

@ -38,6 +38,10 @@ class TestScheduler : public Scheduler {
engine_->Run(closure);
}
void Run(absl::AnyInvocable<void()> cb) override {
engine_->Run(std::move(cb));
}
private:
grpc_event_engine::experimental::EventEngine* engine_;
};
@ -57,7 +61,7 @@ TEST(LockFreeEventTest, BasicTest) {
grpc_core::MutexLock lock(&mu);
// Set NotifyOn first and then SetReady
event.NotifyOn(
IomgrEngineClosure::TestOnlyToClosure([&mu, &cv](absl::Status status) {
PosixEngineClosure::TestOnlyToClosure([&mu, &cv](absl::Status status) {
grpc_core::MutexLock lock(&mu);
EXPECT_TRUE(status.ok());
cv.Signal();
@ -68,7 +72,7 @@ TEST(LockFreeEventTest, BasicTest) {
// SetReady first first and then call NotifyOn
event.SetReady();
event.NotifyOn(
IomgrEngineClosure::TestOnlyToClosure([&mu, &cv](absl::Status status) {
PosixEngineClosure::TestOnlyToClosure([&mu, &cv](absl::Status status) {
grpc_core::MutexLock lock(&mu);
EXPECT_TRUE(status.ok());
cv.Signal();
@ -77,7 +81,7 @@ TEST(LockFreeEventTest, BasicTest) {
// Set NotifyOn and then call SetShutdown
event.NotifyOn(
IomgrEngineClosure::TestOnlyToClosure([&mu, &cv](absl::Status status) {
PosixEngineClosure::TestOnlyToClosure([&mu, &cv](absl::Status status) {
grpc_core::MutexLock lock(&mu);
EXPECT_FALSE(status.ok());
EXPECT_EQ(status, absl::CancelledError("Shutdown"));
@ -110,7 +114,7 @@ TEST(LockFreeEventTest, MultiThreadedTest) {
}
active++;
if (thread_id == 0) {
event.NotifyOn(IomgrEngineClosure::TestOnlyToClosure(
event.NotifyOn(PosixEngineClosure::TestOnlyToClosure(
[&mu, &cv, &signalled](absl::Status status) {
grpc_core::MutexLock lock(&mu);
EXPECT_TRUE(status.ok());

@ -1972,6 +1972,8 @@ src/core/lib/event_engine/slice_buffer.cc \
src/core/lib/event_engine/socket_notifier.h \
src/core/lib/event_engine/thread_pool.cc \
src/core/lib/event_engine/thread_pool.h \
src/core/lib/event_engine/time_util.cc \
src/core/lib/event_engine/time_util.h \
src/core/lib/event_engine/trace.cc \
src/core/lib/event_engine/trace.h \
src/core/lib/event_engine/utils.cc \

@ -1762,6 +1762,8 @@ src/core/lib/event_engine/slice_buffer.cc \
src/core/lib/event_engine/socket_notifier.h \
src/core/lib/event_engine/thread_pool.cc \
src/core/lib/event_engine/thread_pool.h \
src/core/lib/event_engine/time_util.cc \
src/core/lib/event_engine/time_util.h \
src/core/lib/event_engine/trace.cc \
src/core/lib/event_engine/trace.h \
src/core/lib/event_engine/utils.cc \

Loading…
Cancel
Save