Update to the event engine poller interface (#30828)

* change poller interface for posix event engine

* update event_poller_posix_test

* windows ee changes

* update comment

* remove unused deps

* review comments

* fix build issue

* fix sanity

* fix build

* Automated change: Fix sanity tests

* review comments

* remove unused dep

* sanity

* fix typo and address review comments

* review comments

* Automated change: Fix sanity tests

* fix typo in comment

Co-authored-by: Vignesh2208 <Vignesh2208@users.noreply.github.com>
pull/31007/head
Vignesh Babu 2 years ago committed by GitHub
parent e21199e7c9
commit 4e7f0e1eac
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 11
      BUILD
  2. 26
      src/core/lib/event_engine/poller.h
  3. 53
      src/core/lib/event_engine/posix_engine/ev_epoll1_linux.cc
  4. 9
      src/core/lib/event_engine/posix_engine/ev_epoll1_linux.h
  5. 62
      src/core/lib/event_engine/posix_engine/ev_poll_posix.cc
  6. 4
      src/core/lib/event_engine/posix_engine/ev_poll_posix.h
  7. 18
      src/core/lib/event_engine/windows/iocp.cc
  8. 3
      src/core/lib/event_engine/windows/iocp.h
  9. 50
      test/core/event_engine/posix/event_poller_posix_test.cc
  10. 66
      test/core/event_engine/windows/iocp_test.cc

11
BUILD

@ -2390,10 +2390,7 @@ grpc_cc_library(
hdrs = [ hdrs = [
"src/core/lib/event_engine/poller.h", "src/core/lib/event_engine/poller.h",
], ],
external_deps = [ external_deps = ["absl/functional:function_ref"],
"absl/container:inlined_vector",
"absl/types:variant",
],
deps = [ deps = [
"event_engine_base_hdrs", "event_engine_base_hdrs",
"gpr_platform", "gpr_platform",
@ -2646,7 +2643,8 @@ grpc_cc_library(
], ],
external_deps = [ external_deps = [
"absl/base:core_headers", "absl/base:core_headers",
"absl/functional:any_invocable", "absl/container:inlined_vector",
"absl/functional:function_ref",
"absl/memory", "absl/memory",
"absl/status", "absl/status",
"absl/status:statusor", "absl/status:statusor",
@ -2654,7 +2652,6 @@ grpc_cc_library(
"absl/synchronization", "absl/synchronization",
], ],
deps = [ deps = [
"common_event_engine_closures",
"event_engine_base_hdrs", "event_engine_base_hdrs",
"event_engine_poller", "event_engine_poller",
"event_engine_time_util", "event_engine_time_util",
@ -2678,7 +2675,9 @@ grpc_cc_library(
], ],
external_deps = [ external_deps = [
"absl/base:core_headers", "absl/base:core_headers",
"absl/container:inlined_vector",
"absl/functional:any_invocable", "absl/functional:any_invocable",
"absl/functional:function_ref",
"absl/status", "absl/status",
"absl/status:statusor", "absl/status:statusor",
"absl/strings", "absl/strings",

@ -16,8 +16,7 @@
#include <grpc/support/port_platform.h> #include <grpc/support/port_platform.h>
#include "absl/container/inlined_vector.h" #include "absl/functional/function_ref.h"
#include "absl/types/variant.h"
#include <grpc/event_engine/event_engine.h> #include <grpc/event_engine/event_engine.h>
@ -30,20 +29,23 @@ namespace experimental {
// Work(...). // Work(...).
class Poller { class Poller {
public: public:
// This initial vector size may need to be tuned enum class WorkResult { kOk, kDeadlineExceeded, kKicked };
using Events = absl::InlinedVector<EventEngine::Closure*, 5>;
struct DeadlineExceeded {};
struct Kicked {};
using WorkResult = absl::variant<Events, DeadlineExceeded, Kicked>;
virtual ~Poller() = default; virtual ~Poller() = default;
// Poll once for events, returning a collection of Closures to be executed. // Poll once for events and process received events. The callback function
// "schedule_poll_again" is expected to be run synchronously prior to
// processing received events. The callback's responsibility primarily is to
// schedule Poller::Work asynchronously again. This would ensure that the next
// polling cycle would run as quickly as possible to ensure continuous
// polling.
// //
// Returns: // Returns:
// * absl::AbortedError if it was Kicked. // * Poller::WorkResult::kKicked if it was Kicked.
// * absl::DeadlineExceeded if timeout occurred // * Poller::WorkResult::kDeadlineExceeded if timeout occurred
// * A collection of closures to execute, otherwise // * Poller::WorkResult::kOk, otherwise indicating that the callback function
virtual WorkResult Work(EventEngine::Duration timeout) = 0; // was run synchonously before some events were processed.
virtual WorkResult Work(EventEngine::Duration timeout,
absl::FunctionRef<void()> schedule_poll_again) = 0;
// Trigger the threads executing Work(..) to break out as soon as possible. // Trigger the threads executing Work(..) to break out as soon as possible.
virtual void Kick() = 0; virtual void Kick() = 0;
}; };

@ -20,7 +20,6 @@
#include <atomic> #include <atomic>
#include <memory> #include <memory>
#include "absl/functional/any_invocable.h"
#include "absl/memory/memory.h" #include "absl/memory/memory.h"
#include "absl/status/status.h" #include "absl/status/status.h"
#include "absl/status/statusor.h" #include "absl/status/statusor.h"
@ -45,7 +44,6 @@
#include "absl/synchronization/mutex.h" #include "absl/synchronization/mutex.h"
#include "src/core/lib/event_engine/common_closures.h"
#include "src/core/lib/event_engine/posix_engine/event_poller.h" #include "src/core/lib/event_engine/posix_engine/event_poller.h"
#include "src/core/lib/event_engine/posix_engine/lockfree_event.h" #include "src/core/lib/event_engine/posix_engine/lockfree_event.h"
#include "src/core/lib/event_engine/posix_engine/posix_engine_closure.h" #include "src/core/lib/event_engine/posix_engine/posix_engine_closure.h"
@ -61,7 +59,6 @@ using ::grpc_event_engine::posix_engine::WakeupFd;
namespace grpc_event_engine { namespace grpc_event_engine {
namespace posix_engine { namespace posix_engine {
using ::grpc_event_engine::experimental::AnyInvocableClosure;
using ::grpc_event_engine::experimental::EventEngine; using ::grpc_event_engine::experimental::EventEngine;
using ::grpc_event_engine::experimental::Poller; using ::grpc_event_engine::experimental::Poller;
using ::grpc_event_engine::posix_engine::LockfreeEvent; using ::grpc_event_engine::posix_engine::LockfreeEvent;
@ -73,7 +70,6 @@ class Epoll1EventHandle : public EventHandle {
: fd_(fd), : fd_(fd),
list_(this), list_(this),
poller_(poller), poller_(poller),
exec_actions_closure_([this]() { ExecutePendingActions(); }),
read_closure_(absl::make_unique<LockfreeEvent>(poller->GetScheduler())), read_closure_(absl::make_unique<LockfreeEvent>(poller->GetScheduler())),
write_closure_( write_closure_(
absl::make_unique<LockfreeEvent>(poller->GetScheduler())), absl::make_unique<LockfreeEvent>(poller->GetScheduler())),
@ -96,8 +92,8 @@ class Epoll1EventHandle : public EventHandle {
pending_error_.store(false, std::memory_order_relaxed); pending_error_.store(false, std::memory_order_relaxed);
} }
Epoll1Poller* Poller() { return poller_; } Epoll1Poller* Poller() { return poller_; }
EventEngine::Closure* SetPendingActions(bool pending_read, bool pending_write, bool SetPendingActions(bool pending_read, bool pending_write,
bool pending_error) { bool pending_error) {
// Another thread may be executing ExecutePendingActions() at this point // Another thread may be executing ExecutePendingActions() at this point
// This is possible for instance, if one instantiation of Work(..) sets // This is possible for instance, if one instantiation of Work(..) sets
// an fd to be readable while the next instantiation of Work(...) may // an fd to be readable while the next instantiation of Work(...) may
@ -118,10 +114,7 @@ class Epoll1EventHandle : public EventHandle {
pending_error_.store(true, std::memory_order_release); pending_error_.store(true, std::memory_order_release);
} }
if (pending_read || pending_write || pending_error) { return pending_read || pending_write || pending_error;
return &exec_actions_closure_;
}
return nullptr;
} }
int WrappedFd() override { return fd_; } int WrappedFd() override { return fd_; }
void OrphanHandle(PosixEngineClosure* on_done, int* release_fd, void OrphanHandle(PosixEngineClosure* on_done, int* release_fd,
@ -167,7 +160,6 @@ class Epoll1EventHandle : public EventHandle {
std::atomic<bool> pending_error_{false}; std::atomic<bool> pending_error_{false};
Epoll1Poller::HandlesList list_; Epoll1Poller::HandlesList list_;
Epoll1Poller* poller_; Epoll1Poller* poller_;
AnyInvocableClosure exec_actions_closure_;
std::unique_ptr<LockfreeEvent> read_closure_; std::unique_ptr<LockfreeEvent> read_closure_;
std::unique_ptr<LockfreeEvent> write_closure_; std::unique_ptr<LockfreeEvent> write_closure_;
std::unique_ptr<LockfreeEvent> error_closure_; std::unique_ptr<LockfreeEvent> error_closure_;
@ -432,7 +424,7 @@ EventHandle* Epoll1Poller::CreateHandle(int fd, absl::string_view /*name*/,
// function. It also returns the list of closures to run to take action // function. It also returns the list of closures to run to take action
// on file descriptors that became readable/writable. // on file descriptors that became readable/writable.
bool Epoll1Poller::ProcessEpollEvents(int max_epoll_events_to_handle, bool Epoll1Poller::ProcessEpollEvents(int max_epoll_events_to_handle,
Poller::Events& pending_events) { Events& pending_events) {
int64_t num_events = g_epoll_set_.num_events; int64_t num_events = g_epoll_set_.num_events;
int64_t cursor = g_epoll_set_.cursor; int64_t cursor = g_epoll_set_.cursor;
bool was_kicked = false; bool was_kicked = false;
@ -454,10 +446,10 @@ bool Epoll1Poller::ProcessEpollEvents(int max_epoll_events_to_handle,
bool read_ev = (ev->events & (EPOLLIN | EPOLLPRI)) != 0; bool read_ev = (ev->events & (EPOLLIN | EPOLLPRI)) != 0;
bool write_ev = (ev->events & EPOLLOUT) != 0; bool write_ev = (ev->events & EPOLLOUT) != 0;
bool err_fallback = error && !track_err; bool err_fallback = error && !track_err;
if (EventEngine::Closure* closure = handle->SetPendingActions( if (handle->SetPendingActions(read_ev || cancel || err_fallback,
read_ev || cancel || err_fallback, write_ev || cancel || err_fallback,
write_ev || cancel || err_fallback, error && !err_fallback)) { error && !err_fallback)) {
pending_events.push_back(closure); pending_events.push_back(handle);
} }
} }
} }
@ -522,13 +514,16 @@ void Epoll1EventHandle::SetWritable() { write_closure_->SetReady(); }
void Epoll1EventHandle::SetHasError() { error_closure_->SetReady(); } void Epoll1EventHandle::SetHasError() { error_closure_->SetReady(); }
// Polls the registered Fds for events until timeout is reached or there is a // Polls the registered Fds for events until timeout is reached or there is a
// Kick(). If there is a Kick(), it returns any previously un-processed events. // Kick(). If there is a Kick(), it collects and processes any previously
// If there are no un-processed events, it returns Poller::WorkResult::Kicked{} // un-processed events. If there are no un-processed events, it returns
Poller::WorkResult Epoll1Poller::Work(EventEngine::Duration timeout) { // Poller::WorkResult::Kicked{}
Poller::Events pending_events; Poller::WorkResult Epoll1Poller::Work(
EventEngine::Duration timeout,
absl::FunctionRef<void()> schedule_poll_again) {
Events pending_events;
if (g_epoll_set_.cursor == g_epoll_set_.num_events) { if (g_epoll_set_.cursor == g_epoll_set_.num_events) {
if (DoEpollWait(timeout) == 0) { if (DoEpollWait(timeout) == 0) {
return Poller::DeadlineExceeded{}; return Poller::WorkResult::kDeadlineExceeded;
} }
} }
{ {
@ -540,10 +535,16 @@ Poller::WorkResult Epoll1Poller::Work(EventEngine::Duration timeout) {
was_kicked_ = false; was_kicked_ = false;
} }
if (pending_events.empty()) { if (pending_events.empty()) {
return Poller::Kicked{}; return Poller::WorkResult::kKicked;
} }
return pending_events;
} }
// Run the provided callback.
schedule_poll_again();
// Process all pending events inline.
for (auto& it : pending_events) {
it->ExecutePendingActions();
}
return Poller::WorkResult::kOk;
} }
void Epoll1Poller::Kick() { void Epoll1Poller::Kick() {
@ -589,7 +590,7 @@ EventHandle* Epoll1Poller::CreateHandle(int /*fd*/, absl::string_view /*name*/,
} }
bool Epoll1Poller::ProcessEpollEvents(int /*max_epoll_events_to_handle*/, bool Epoll1Poller::ProcessEpollEvents(int /*max_epoll_events_to_handle*/,
Poller::Events& /*pending_events*/) { Events& /*pending_events*/) {
GPR_ASSERT(false && "unimplemented"); GPR_ASSERT(false && "unimplemented");
} }
@ -597,7 +598,9 @@ int Epoll1Poller::DoEpollWait(EventEngine::Duration /*timeout*/) {
GPR_ASSERT(false && "unimplemented"); GPR_ASSERT(false && "unimplemented");
} }
Poller::WorkResult Epoll1Poller::Work(EventEngine::Duration /*timeout*/) { Poller::WorkResult Epoll1Poller::Work(
EventEngine::Duration /*timeout*/,
absl::FunctionRef<void()> /*schedule_poll_again*/) {
GPR_ASSERT(false && "unimplemented"); GPR_ASSERT(false && "unimplemented");
} }

@ -21,6 +21,8 @@
#include <string> #include <string>
#include "absl/base/thread_annotations.h" #include "absl/base/thread_annotations.h"
#include "absl/container/inlined_vector.h"
#include "absl/functional/function_ref.h"
#include "absl/strings/string_view.h" #include "absl/strings/string_view.h"
#include "absl/synchronization/mutex.h" #include "absl/synchronization/mutex.h"
@ -49,7 +51,8 @@ class Epoll1Poller : public PosixEventPoller {
EventHandle* CreateHandle(int fd, absl::string_view name, EventHandle* CreateHandle(int fd, absl::string_view name,
bool track_err) override; bool track_err) override;
Poller::WorkResult Work( Poller::WorkResult Work(
grpc_event_engine::experimental::EventEngine::Duration timeout) override; grpc_event_engine::experimental::EventEngine::Duration timeout,
absl::FunctionRef<void()> schedule_poll_again) override;
std::string Name() override { return "epoll1"; } std::string Name() override { return "epoll1"; }
void Kick() override; void Kick() override;
Scheduler* GetScheduler() { return scheduler_; } Scheduler* GetScheduler() { return scheduler_; }
@ -57,6 +60,8 @@ class Epoll1Poller : public PosixEventPoller {
~Epoll1Poller() override; ~Epoll1Poller() override;
private: private:
// This initial vector size may need to be tuned
using Events = absl::InlinedVector<Epoll1EventHandle*, 5>;
// Process the epoll events found by DoEpollWait() function. // Process the epoll events found by DoEpollWait() function.
// - g_epoll_set.cursor points to the index of the first event to be processed // - g_epoll_set.cursor points to the index of the first event to be processed
// - This function then processes up-to max_epoll_events_to_handle and // - This function then processes up-to max_epoll_events_to_handle and
@ -65,7 +70,7 @@ class Epoll1Poller : public PosixEventPoller {
// function. It also returns the list of closures to run to take action // function. It also returns the list of closures to run to take action
// on file descriptors that became readable/writable. // on file descriptors that became readable/writable.
bool ProcessEpollEvents(int max_epoll_events_to_handle, bool ProcessEpollEvents(int max_epoll_events_to_handle,
Poller::Events& pending_events); Events& pending_events);
// Do epoll_wait and store the events in g_epoll_set.events field. This does // Do epoll_wait and store the events in g_epoll_set.events field. This does
// not "process" any of the events yet; that is done in ProcessEpollEvents(). // not "process" any of the events yet; that is done in ProcessEpollEvents().
// See ProcessEpollEvents() function for more details. It returns the number // See ProcessEpollEvents() function for more details. It returns the number

@ -24,6 +24,7 @@
#include <memory> #include <memory>
#include <utility> #include <utility>
#include "absl/container/inlined_vector.h"
#include "absl/functional/any_invocable.h" #include "absl/functional/any_invocable.h"
#include "absl/status/status.h" #include "absl/status/status.h"
#include "absl/status/statusor.h" #include "absl/status/statusor.h"
@ -75,6 +76,7 @@ using ::grpc_event_engine::experimental::AnyInvocableClosure;
using ::grpc_event_engine::experimental::EventEngine; using ::grpc_event_engine::experimental::EventEngine;
using ::grpc_event_engine::experimental::Poller; using ::grpc_event_engine::experimental::Poller;
using ::grpc_event_engine::posix_engine::WakeupFd; using ::grpc_event_engine::posix_engine::WakeupFd;
using Events = absl::InlinedVector<PollEventHandle*, 5>;
class PollEventHandle : public EventHandle { class PollEventHandle : public EventHandle {
public: public:
@ -102,8 +104,7 @@ class PollEventHandle : public EventHandle {
poller_->PollerHandlesListAddHandle(this); poller_->PollerHandlesListAddHandle(this);
} }
PollPoller* Poller() { return poller_; } PollPoller* Poller() { return poller_; }
EventEngine::Closure* SetPendingActions(bool pending_read, bool SetPendingActions(bool pending_read, bool pending_write) {
bool pending_write) {
pending_actions_ |= pending_read; pending_actions_ |= pending_read;
if (pending_write) { if (pending_write) {
pending_actions_ |= (1 << 2); pending_actions_ |= (1 << 2);
@ -112,9 +113,9 @@ class PollEventHandle : public EventHandle {
// The closure is going to be executed. We'll Unref this handle in // The closure is going to be executed. We'll Unref this handle in
// ExecutePendingActions. // ExecutePendingActions.
Ref(); Ref();
return &exec_actions_closure_; return true;
} }
return nullptr; return false;
} }
void ForceRemoveHandleFromPoller() { void ForceRemoveHandleFromPoller() {
absl::MutexLock lock(&poller_->mu_); absl::MutexLock lock(&poller_->mu_);
@ -202,7 +203,7 @@ class PollEventHandle : public EventHandle {
} }
uint32_t BeginPollLocked(uint32_t read_mask, uint32_t write_mask) uint32_t BeginPollLocked(uint32_t read_mask, uint32_t write_mask)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
EventEngine::Closure* EndPollLocked(int got_read, int got_write) bool EndPollLocked(bool got_read, bool got_write)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
private: private:
@ -559,15 +560,13 @@ uint32_t PollEventHandle::BeginPollLocked(uint32_t read_mask,
return mask; return mask;
} }
EventEngine::Closure* PollEventHandle::EndPollLocked(int got_read, bool PollEventHandle::EndPollLocked(bool got_read, bool got_write) {
int got_write) {
EventEngine::Closure* closure = nullptr;
if (is_orphaned_ && !IsWatched()) { if (is_orphaned_ && !IsWatched()) {
CloseFd(); CloseFd();
} else if (!is_orphaned_) { } else if (!is_orphaned_) {
closure = SetPendingActions(got_read, got_write); return SetPendingActions(got_read, got_write);
} }
return closure; return false;
} }
void PollPoller::KickExternal(bool ext) { void PollPoller::KickExternal(bool ext) {
@ -641,13 +640,15 @@ PollPoller::~PollPoller() {
GPR_ASSERT(poll_handles_list_head_ == nullptr); GPR_ASSERT(poll_handles_list_head_ == nullptr);
} }
Poller::WorkResult PollPoller::Work(EventEngine::Duration timeout) { Poller::WorkResult PollPoller::Work(
EventEngine::Duration timeout,
absl::FunctionRef<void()> schedule_poll_again) {
// Avoid malloc for small number of elements. // Avoid malloc for small number of elements.
enum { inline_elements = 96 }; enum { inline_elements = 96 };
struct pollfd pollfd_space[inline_elements]; struct pollfd pollfd_space[inline_elements];
bool was_kicked_ext = false; bool was_kicked_ext = false;
PollEventHandle* watcher_space[inline_elements]; PollEventHandle* watcher_space[inline_elements];
Poller::Events pending_events; Events pending_events;
int timeout_ms = int timeout_ms =
static_cast<int>(grpc_event_engine::experimental::Milliseconds(timeout)); static_cast<int>(grpc_event_engine::experimental::Milliseconds(timeout));
mu_.Lock(); mu_.Lock();
@ -733,25 +734,25 @@ Poller::WorkResult PollPoller::Work(EventEngine::Duration timeout) {
// This case implies the fd was polled (since watch_mask > 0 and // This case implies the fd was polled (since watch_mask > 0 and
// the poll returned an error. Mark the fds as both readable and // the poll returned an error. Mark the fds as both readable and
// writable. // writable.
if (EventEngine::Closure* closure = head->EndPollLocked(1, 1)) { if (head->EndPollLocked(true, true)) {
// Its safe to add to list of pending events because // Its safe to add to list of pending events because
// EndPollLocked returns a +ve number only when the handle is // EndPollLocked returns true only when the handle is
// not orphaned. But an orphan might be initiated on the handle // not orphaned. But an orphan might be initiated on the handle
// after this Work() method returns and before the next Work() // after this Work() method returns and before the next Work()
// method is invoked. // method is invoked.
pending_events.push_back(closure); pending_events.push_back(head);
} }
} else { } else {
// In this case, (1) watch_mask > 0 && r == 0 or (2) watch_mask == // In this case, (1) watch_mask > 0 && r == 0 or (2) watch_mask ==
// 0 and r < 0 or (3) watch_mask == 0 and r == 0. For case-1, no // 0 and r < 0 or (3) watch_mask == 0 and r == 0. For case-1, no
// events are pending on the fd even though the fd was polled. For // events are pending on the fd even though the fd was polled. For
// case-2 and 3, the fd was not polled // case-2 and 3, the fd was not polled
head->EndPollLocked(0, 0); head->EndPollLocked(false, false);
} }
} else { } else {
// It can enter this case if an orphan was invoked on the handle // It can enter this case if an orphan was invoked on the handle
// while it was being polled. // while it was being polled.
head->EndPollLocked(0, 0); head->EndPollLocked(false, false);
} }
lock.Release(); lock.Release();
// Unref the ref taken at BeginPollLocked. // Unref the ref taken at BeginPollLocked.
@ -770,22 +771,21 @@ Poller::WorkResult PollPoller::Work(EventEngine::Duration timeout) {
// handle while it was being polled. If watch_mask is 0, then the fd // handle while it was being polled. If watch_mask is 0, then the fd
// was not polled. // was not polled.
head->SetWatched(-1); head->SetWatched(-1);
head->EndPollLocked(0, 0); head->EndPollLocked(false, false);
} else { } else {
// Watched is true and watch_mask > 0 // Watched is true and watch_mask > 0
if (pfds[i].revents & POLLHUP) { if (pfds[i].revents & POLLHUP) {
head->SetPollhup(true); head->SetPollhup(true);
} }
head->SetWatched(-1); head->SetWatched(-1);
if (EventEngine::Closure* closure = if (head->EndPollLocked(pfds[i].revents & kPollinCheck,
head->EndPollLocked(pfds[i].revents & kPollinCheck, pfds[i].revents & kPolloutCheck)) {
pfds[i].revents & kPolloutCheck)) {
// Its safe to add to list of pending events because EndPollLocked // Its safe to add to list of pending events because EndPollLocked
// returns a +ve number only when the handle is not orphaned. // returns true only when the handle is not orphaned.
// But an orphan might be initiated on the handle after this // But an orphan might be initiated on the handle after this
// Work() method returns and before the next Work() method is // Work() method returns and before the next Work() method is
// invoked. // invoked.
pending_events.push_back(closure); pending_events.push_back(head);
} }
} }
lock.Release(); lock.Release();
@ -811,11 +811,17 @@ Poller::WorkResult PollPoller::Work(EventEngine::Duration timeout) {
mu_.Unlock(); mu_.Unlock();
if (pending_events.empty()) { if (pending_events.empty()) {
if (was_kicked_ext) { if (was_kicked_ext) {
return Poller::Kicked{}; return Poller::WorkResult::kKicked;
} }
return Poller::DeadlineExceeded{}; return Poller::WorkResult::kDeadlineExceeded;
}
// Run the provided callback synchronously.
schedule_poll_again();
// Process all pending events inline.
for (auto& it : pending_events) {
it->ExecutePendingActions();
} }
return pending_events; return Poller::WorkResult::kOk;
} }
void PollPoller::Shutdown() { void PollPoller::Shutdown() {
@ -855,7 +861,9 @@ EventHandle* PollPoller::CreateHandle(int /*fd*/, absl::string_view /*name*/,
GPR_ASSERT(false && "unimplemented"); GPR_ASSERT(false && "unimplemented");
} }
Poller::WorkResult PollPoller::Work(EventEngine::Duration /*timeout*/) { Poller::WorkResult PollPoller::Work(
EventEngine::Duration /*timeout*/,
absl::FunctionRef<void()> /*schedule_poll_again*/) {
GPR_ASSERT(false && "unimplemented"); GPR_ASSERT(false && "unimplemented");
} }

@ -22,6 +22,7 @@
#include <string> #include <string>
#include "absl/base/thread_annotations.h" #include "absl/base/thread_annotations.h"
#include "absl/functional/function_ref.h"
#include "absl/strings/string_view.h" #include "absl/strings/string_view.h"
#include "absl/synchronization/mutex.h" #include "absl/synchronization/mutex.h"
@ -44,7 +45,8 @@ class PollPoller : public PosixEventPoller {
EventHandle* CreateHandle(int fd, absl::string_view name, EventHandle* CreateHandle(int fd, absl::string_view name,
bool track_err) override; bool track_err) override;
Poller::WorkResult Work( Poller::WorkResult Work(
grpc_event_engine::experimental::EventEngine::Duration timeout) override; grpc_event_engine::experimental::EventEngine::Duration timeout,
absl::FunctionRef<void()> schedule_poll_again) override;
std::string Name() override { return "poll"; } std::string Name() override { return "poll"; }
void Kick() override; void Kick() override;
Scheduler* GetScheduler() { return scheduler_; } Scheduler* GetScheduler() { return scheduler_; }

@ -59,12 +59,13 @@ WinSocket* IOCP::Watch(SOCKET socket) {
void IOCP::Shutdown() { void IOCP::Shutdown() {
while (outstanding_kicks_.load() > 0) { while (outstanding_kicks_.load() > 0) {
Work(std::chrono::hours(42)); Work(std::chrono::hours(42), []() {});
} }
GPR_ASSERT(CloseHandle(iocp_handle_)); GPR_ASSERT(CloseHandle(iocp_handle_));
} }
Poller::WorkResult IOCP::Work(EventEngine::Duration timeout) { Poller::WorkResult IOCP::Work(EventEngine::Duration timeout,
absl::FunctionRef<void()> schedule_poll_again) {
static const absl::Status kDeadlineExceeded = absl::DeadlineExceededError( static const absl::Status kDeadlineExceeded = absl::DeadlineExceededError(
absl::StrFormat("IOCP::%p: Received no completions", this)); absl::StrFormat("IOCP::%p: Received no completions", this));
static const absl::Status kKicked = static const absl::Status kKicked =
@ -82,7 +83,7 @@ Poller::WorkResult IOCP::Work(EventEngine::Duration timeout) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_event_engine_trace)) { if (GRPC_TRACE_FLAG_ENABLED(grpc_event_engine_trace)) {
gpr_log(GPR_DEBUG, "IOCP::%p deadline exceeded", this); gpr_log(GPR_DEBUG, "IOCP::%p deadline exceeded", this);
} }
return Poller::DeadlineExceeded{}; return Poller::WorkResult::kDeadlineExceeded;
} }
GPR_ASSERT(completion_key && overlapped); GPR_ASSERT(completion_key && overlapped);
if (overlapped == &kick_overlap_) { if (overlapped == &kick_overlap_) {
@ -91,7 +92,7 @@ Poller::WorkResult IOCP::Work(EventEngine::Duration timeout) {
} }
outstanding_kicks_.fetch_sub(1); outstanding_kicks_.fetch_sub(1);
if (completion_key == (ULONG_PTR)&kick_token_) { if (completion_key == (ULONG_PTR)&kick_token_) {
return Poller::Kicked{}; return Poller::WorkResult::kKicked;
} }
gpr_log(GPR_ERROR, "Unknown custom completion key: %p", completion_key); gpr_log(GPR_ERROR, "Unknown custom completion key: %p", completion_key);
abort(); abort();
@ -109,10 +110,15 @@ Poller::WorkResult IOCP::Work(EventEngine::Duration timeout) {
} else { } else {
info->GetOverlappedResult(); info->GetOverlappedResult();
} }
if (info->closure() != nullptr) return Events{info->closure()}; if (info->closure() != nullptr) {
schedule_poll_again();
executor_->Run(info->closure());
return Poller::WorkResult::kOk;
}
// No callback registered. Set ready and return an empty set // No callback registered. Set ready and return an empty set
info->SetReady(); info->SetReady();
return Events{}; schedule_poll_again();
return Poller::WorkResult::kOk;
} }
void IOCP::Kick() { void IOCP::Kick() {

@ -42,7 +42,8 @@ class IOCP final : public Poller {
// interface methods // interface methods
void Shutdown(); void Shutdown();
WorkResult Work(EventEngine::Duration timeout) override; WorkResult Work(EventEngine::Duration timeout,
absl::FunctionRef<void()> schedule_poll_again) override;
void Kick() override; void Kick() override;
WinSocket* Watch(SOCKET socket); WinSocket* Watch(SOCKET socket);

@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
#include <cstring>
#include <ostream> #include <ostream>
#include "absl/functional/any_invocable.h" #include "absl/functional/any_invocable.h"
@ -245,6 +246,9 @@ void ListenCb(server* sv, absl::Status status) {
[sv](absl::Status status) { ListenCb(sv, status); }); [sv](absl::Status status) { ListenCb(sv, status); });
listen_em_fd->NotifyOnRead(sv->listen_closure); listen_em_fd->NotifyOnRead(sv->listen_closure);
return; return;
} else if (fd < 0) {
gpr_log(GPR_ERROR, "Failed to acceot a connection, returned error: %s",
std::strerror(errno));
} }
EXPECT_GE(fd, 0); EXPECT_GE(fd, 0);
EXPECT_LT(fd, FD_SETSIZE); EXPECT_LT(fd, FD_SETSIZE);
@ -376,14 +380,8 @@ void WaitAndShutdown(server* sv, client* cl) {
gpr_mu_lock(&g_mu); gpr_mu_lock(&g_mu);
while (!sv->done || !cl->done) { while (!sv->done || !cl->done) {
gpr_mu_unlock(&g_mu); gpr_mu_unlock(&g_mu);
result = g_event_poller->Work(24h); result = g_event_poller->Work(24h, []() {});
if (absl::holds_alternative<Poller::Events>(result)) { ASSERT_FALSE(result == Poller::WorkResult::kDeadlineExceeded);
auto pending_events = absl::get<Poller::Events>(result);
for (auto it = pending_events.begin(); it != pending_events.end(); ++it) {
(*it)->Run();
}
pending_events.clear();
}
gpr_mu_lock(&g_mu); gpr_mu_lock(&g_mu);
} }
gpr_mu_unlock(&g_mu); gpr_mu_unlock(&g_mu);
@ -507,15 +505,8 @@ TEST_P(EventPollerTest, TestEventPollerHandleChange) {
gpr_mu_lock(&g_mu); gpr_mu_lock(&g_mu);
while (fdc->cb_that_ran == nullptr) { while (fdc->cb_that_ran == nullptr) {
gpr_mu_unlock(&g_mu); gpr_mu_unlock(&g_mu);
result = g_event_poller->Work(24h); result = g_event_poller->Work(24h, []() {});
if (absl::holds_alternative<Poller::Events>(result)) { ASSERT_FALSE(result == Poller::WorkResult::kDeadlineExceeded);
auto pending_events = absl::get<Poller::Events>(result);
for (auto it = pending_events.begin(); it != pending_events.end();
++it) {
(*it)->Run();
}
pending_events.clear();
}
gpr_mu_lock(&g_mu); gpr_mu_lock(&g_mu);
} }
}; };
@ -686,26 +677,19 @@ class Worker : public grpc_core::DualRefCounted<Worker> {
private: private:
void Work() { void Work() {
auto result = g_event_poller->Work(24h); auto result = g_event_poller->Work(24h, [this]() {
if (absl::holds_alternative<Poller::Events>(result)) {
// Schedule next work instantiation immediately and take a Ref for // Schedule next work instantiation immediately and take a Ref for
// the next instantiation. // the next instantiation.
Ref().release(); Ref().release();
scheduler_->Run([this]() { Work(); }); scheduler_->Run([this]() { Work(); });
// Process pending events of current Work(..) instantiation. });
auto pending_events = absl::get<Poller::Events>(result); ASSERT_TRUE(result == Poller::WorkResult::kOk ||
for (auto it = pending_events.begin(); it != pending_events.end(); ++it) { result == Poller::WorkResult::kKicked);
(*it)->Run(); // Corresponds to the Ref taken for the current instantiation. If the
} // result was Poller::WorkResult::kKicked, then the next work instantiation
pending_events.clear(); // would not have been scheduled and the poll_again callback should have
// Corresponds to the Ref taken for the current instantiation. // been deleted.
Unref(); Unref();
} else {
// The poller got kicked. This can only happen when all the Fds have
// orphaned themselves.
EXPECT_TRUE(absl::holds_alternative<Poller::Kicked>(result));
Unref();
}
} }
Scheduler* scheduler_; Scheduler* scheduler_;
PosixEventPoller* poller_; PosixEventPoller* poller_;

@ -114,17 +114,17 @@ TEST_F(IOCPTest, ClientReceivesNotificationOfServerSend) {
wrapped_server_socket->NotifyOnWrite(on_write); wrapped_server_socket->NotifyOnWrite(on_write);
} }
// Doing work for WSASend // Doing work for WSASend
auto work_result = iocp.Work(std::chrono::seconds(10)); bool cb_invoked = false;
ASSERT_TRUE(absl::holds_alternative<Poller::Events>(work_result)); auto work_result = iocp.Work(std::chrono::seconds(10),
Poller::Events closures = absl::get<Poller::Events>(work_result); [&cb_invoked]() { cb_invoked = true; });
ASSERT_EQ(closures.size(), 1); ASSERT_TRUE(work_result == Poller::WorkResult::kOk);
executor.Run(closures[0]); ASSERT_TRUE(cb_invoked);
// Doing work for WSARecv // Doing work for WSARecv
work_result = iocp.Work(std::chrono::seconds(10)); cb_invoked = false;
ASSERT_TRUE(absl::holds_alternative<Poller::Events>(work_result)); work_result = iocp.Work(std::chrono::seconds(10),
closures = absl::get<Poller::Events>(work_result); [&cb_invoked]() { cb_invoked = true; });
ASSERT_EQ(closures.size(), 1); ASSERT_TRUE(work_result == Poller::WorkResult::kOk);
executor.Run(closures[0]); ASSERT_TRUE(cb_invoked);
// wait for the callbacks to run // wait for the callbacks to run
ASSERT_TRUE(read_called.Get()); ASSERT_TRUE(read_called.Get());
ASSERT_TRUE(write_called.Get()); ASSERT_TRUE(write_called.Get());
@ -185,11 +185,12 @@ TEST_F(IOCPTest, IocpWorkTimeoutDueToNoNotificationRegistered) {
&write_overlapped, NULL); &write_overlapped, NULL);
EXPECT_EQ(status, 0); EXPECT_EQ(status, 0);
} }
// IOCP::Work without any notification callbacks should return no Events. // IOCP::Work without any notification callbacks should still return Ok.
auto work_result = iocp.Work(std::chrono::seconds(2)); bool cb_invoked = false;
ASSERT_TRUE(absl::holds_alternative<Poller::Events>(work_result)); auto work_result = iocp.Work(std::chrono::seconds(2),
Poller::Events closures = absl::get<Poller::Events>(work_result); [&cb_invoked]() { cb_invoked = true; });
ASSERT_EQ(closures.size(), 0); ASSERT_TRUE(work_result == Poller::WorkResult::kOk);
ASSERT_TRUE(cb_invoked);
// register the closure, which should trigger it immediately. // register the closure, which should trigger it immediately.
wrapped_client_socket->NotifyOnRead(on_read); wrapped_client_socket->NotifyOnRead(on_read);
// wait for the callbacks to run // wait for the callbacks to run
@ -205,8 +206,11 @@ TEST_F(IOCPTest, KickWorks) {
IOCP iocp(&executor); IOCP iocp(&executor);
Promise<bool> kicked{false}; Promise<bool> kicked{false};
executor.Run([&iocp, &kicked] { executor.Run([&iocp, &kicked] {
Poller::WorkResult result = iocp.Work(std::chrono::seconds(30)); bool cb_invoked = false;
ASSERT_TRUE(absl::holds_alternative<Poller::Kicked>(result)); Poller::WorkResult result = iocp.Work(
std::chrono::seconds(30), [&cb_invoked]() { cb_invoked = true; });
ASSERT_TRUE(result == Poller::WorkResult::kKicked);
ASSERT_FALSE(cb_invoked);
kicked.Set(true); kicked.Set(true);
}); });
executor.Run([&iocp] { executor.Run([&iocp] {
@ -227,14 +231,21 @@ TEST_F(IOCPTest, KickThenShutdownCasusesNextWorkerToBeKicked) {
// kick twice // kick twice
iocp.Kick(); iocp.Kick();
iocp.Kick(); iocp.Kick();
bool cb_invoked = false;
// Assert the next two WorkResults are kicks // Assert the next two WorkResults are kicks
auto result = iocp.Work(std::chrono::milliseconds(1)); auto result = iocp.Work(std::chrono::milliseconds(1),
ASSERT_TRUE(absl::holds_alternative<Poller::Kicked>(result)); [&cb_invoked]() { cb_invoked = true; });
result = iocp.Work(std::chrono::milliseconds(1)); ASSERT_TRUE(result == Poller::WorkResult::kKicked);
ASSERT_TRUE(absl::holds_alternative<Poller::Kicked>(result)); ASSERT_FALSE(cb_invoked);
result = iocp.Work(std::chrono::milliseconds(1),
[&cb_invoked]() { cb_invoked = true; });
ASSERT_TRUE(result == Poller::WorkResult::kKicked);
ASSERT_FALSE(cb_invoked);
// followed by a DeadlineExceeded // followed by a DeadlineExceeded
result = iocp.Work(std::chrono::milliseconds(1)); result = iocp.Work(std::chrono::milliseconds(1),
ASSERT_TRUE(absl::holds_alternative<Poller::DeadlineExceeded>(result)); [&cb_invoked]() { cb_invoked = true; });
ASSERT_TRUE(result == Poller::WorkResult::kDeadlineExceeded);
ASSERT_FALSE(cb_invoked);
} }
TEST_F(IOCPTest, CrashOnWatchingAClosedSocket) { TEST_F(IOCPTest, CrashOnWatchingAClosedSocket) {
@ -270,13 +281,8 @@ TEST_F(IOCPTest, StressTestThousandsOfSockets) {
std::thread iocp_worker([&iocp, &executor] { std::thread iocp_worker([&iocp, &executor] {
Poller::WorkResult result; Poller::WorkResult result;
do { do {
result = iocp.Work(std::chrono::seconds(1)); result = iocp.Work(std::chrono::seconds(1), []() {});
if (absl::holds_alternative<Poller::Events>(result)) { } while (result != Poller::WorkResult::kDeadlineExceeded);
for (auto& event : absl::get<Poller::Events>(result)) {
executor.Run(event);
}
}
} while (!absl::holds_alternative<Poller::DeadlineExceeded>(result));
}); });
for (int i = 0; i < sockets_per_thread; i++) { for (int i = 0; i < sockets_per_thread; i++) {
SOCKET sockpair[2]; SOCKET sockpair[2];

Loading…
Cancel
Save