diff --git a/BUILD b/BUILD index 93fc2b18ac0..0482ed710ca 100644 --- a/BUILD +++ b/BUILD @@ -2390,10 +2390,7 @@ grpc_cc_library( hdrs = [ "src/core/lib/event_engine/poller.h", ], - external_deps = [ - "absl/container:inlined_vector", - "absl/types:variant", - ], + external_deps = ["absl/functional:function_ref"], deps = [ "event_engine_base_hdrs", "gpr_platform", @@ -2646,7 +2643,8 @@ grpc_cc_library( ], external_deps = [ "absl/base:core_headers", - "absl/functional:any_invocable", + "absl/container:inlined_vector", + "absl/functional:function_ref", "absl/memory", "absl/status", "absl/status:statusor", @@ -2654,7 +2652,6 @@ grpc_cc_library( "absl/synchronization", ], deps = [ - "common_event_engine_closures", "event_engine_base_hdrs", "event_engine_poller", "event_engine_time_util", @@ -2678,7 +2675,9 @@ grpc_cc_library( ], external_deps = [ "absl/base:core_headers", + "absl/container:inlined_vector", "absl/functional:any_invocable", + "absl/functional:function_ref", "absl/status", "absl/status:statusor", "absl/strings", diff --git a/src/core/lib/event_engine/poller.h b/src/core/lib/event_engine/poller.h index f6008857a5a..0ef6ffc03a4 100644 --- a/src/core/lib/event_engine/poller.h +++ b/src/core/lib/event_engine/poller.h @@ -16,8 +16,7 @@ #include -#include "absl/container/inlined_vector.h" -#include "absl/types/variant.h" +#include "absl/functional/function_ref.h" #include @@ -30,20 +29,23 @@ namespace experimental { // Work(...). class Poller { public: - // This initial vector size may need to be tuned - using Events = absl::InlinedVector; - struct DeadlineExceeded {}; - struct Kicked {}; - using WorkResult = absl::variant; + enum class WorkResult { kOk, kDeadlineExceeded, kKicked }; virtual ~Poller() = default; - // Poll once for events, returning a collection of Closures to be executed. + // Poll once for events and process received events. The callback function + // "schedule_poll_again" is expected to be run synchronously prior to + // processing received events. The callback's responsibility primarily is to + // schedule Poller::Work asynchronously again. This would ensure that the next + // polling cycle would run as quickly as possible to ensure continuous + // polling. // // Returns: - // * absl::AbortedError if it was Kicked. - // * absl::DeadlineExceeded if timeout occurred - // * A collection of closures to execute, otherwise - virtual WorkResult Work(EventEngine::Duration timeout) = 0; + // * Poller::WorkResult::kKicked if it was Kicked. + // * Poller::WorkResult::kDeadlineExceeded if timeout occurred + // * Poller::WorkResult::kOk, otherwise indicating that the callback function + // was run synchonously before some events were processed. + virtual WorkResult Work(EventEngine::Duration timeout, + absl::FunctionRef schedule_poll_again) = 0; // Trigger the threads executing Work(..) to break out as soon as possible. virtual void Kick() = 0; }; diff --git a/src/core/lib/event_engine/posix_engine/ev_epoll1_linux.cc b/src/core/lib/event_engine/posix_engine/ev_epoll1_linux.cc index e6c4559e451..b9b15200f11 100644 --- a/src/core/lib/event_engine/posix_engine/ev_epoll1_linux.cc +++ b/src/core/lib/event_engine/posix_engine/ev_epoll1_linux.cc @@ -20,7 +20,6 @@ #include #include -#include "absl/functional/any_invocable.h" #include "absl/memory/memory.h" #include "absl/status/status.h" #include "absl/status/statusor.h" @@ -45,7 +44,6 @@ #include "absl/synchronization/mutex.h" -#include "src/core/lib/event_engine/common_closures.h" #include "src/core/lib/event_engine/posix_engine/event_poller.h" #include "src/core/lib/event_engine/posix_engine/lockfree_event.h" #include "src/core/lib/event_engine/posix_engine/posix_engine_closure.h" @@ -61,7 +59,6 @@ using ::grpc_event_engine::posix_engine::WakeupFd; namespace grpc_event_engine { namespace posix_engine { -using ::grpc_event_engine::experimental::AnyInvocableClosure; using ::grpc_event_engine::experimental::EventEngine; using ::grpc_event_engine::experimental::Poller; using ::grpc_event_engine::posix_engine::LockfreeEvent; @@ -73,7 +70,6 @@ class Epoll1EventHandle : public EventHandle { : fd_(fd), list_(this), poller_(poller), - exec_actions_closure_([this]() { ExecutePendingActions(); }), read_closure_(absl::make_unique(poller->GetScheduler())), write_closure_( absl::make_unique(poller->GetScheduler())), @@ -96,8 +92,8 @@ class Epoll1EventHandle : public EventHandle { pending_error_.store(false, std::memory_order_relaxed); } Epoll1Poller* Poller() { return poller_; } - EventEngine::Closure* SetPendingActions(bool pending_read, bool pending_write, - bool pending_error) { + bool SetPendingActions(bool pending_read, bool pending_write, + bool pending_error) { // Another thread may be executing ExecutePendingActions() at this point // This is possible for instance, if one instantiation of Work(..) sets // an fd to be readable while the next instantiation of Work(...) may @@ -118,10 +114,7 @@ class Epoll1EventHandle : public EventHandle { pending_error_.store(true, std::memory_order_release); } - if (pending_read || pending_write || pending_error) { - return &exec_actions_closure_; - } - return nullptr; + return pending_read || pending_write || pending_error; } int WrappedFd() override { return fd_; } void OrphanHandle(PosixEngineClosure* on_done, int* release_fd, @@ -167,7 +160,6 @@ class Epoll1EventHandle : public EventHandle { std::atomic pending_error_{false}; Epoll1Poller::HandlesList list_; Epoll1Poller* poller_; - AnyInvocableClosure exec_actions_closure_; std::unique_ptr read_closure_; std::unique_ptr write_closure_; std::unique_ptr error_closure_; @@ -432,7 +424,7 @@ EventHandle* Epoll1Poller::CreateHandle(int fd, absl::string_view /*name*/, // function. It also returns the list of closures to run to take action // on file descriptors that became readable/writable. bool Epoll1Poller::ProcessEpollEvents(int max_epoll_events_to_handle, - Poller::Events& pending_events) { + Events& pending_events) { int64_t num_events = g_epoll_set_.num_events; int64_t cursor = g_epoll_set_.cursor; bool was_kicked = false; @@ -454,10 +446,10 @@ bool Epoll1Poller::ProcessEpollEvents(int max_epoll_events_to_handle, bool read_ev = (ev->events & (EPOLLIN | EPOLLPRI)) != 0; bool write_ev = (ev->events & EPOLLOUT) != 0; bool err_fallback = error && !track_err; - if (EventEngine::Closure* closure = handle->SetPendingActions( - read_ev || cancel || err_fallback, - write_ev || cancel || err_fallback, error && !err_fallback)) { - pending_events.push_back(closure); + if (handle->SetPendingActions(read_ev || cancel || err_fallback, + write_ev || cancel || err_fallback, + error && !err_fallback)) { + pending_events.push_back(handle); } } } @@ -522,13 +514,16 @@ void Epoll1EventHandle::SetWritable() { write_closure_->SetReady(); } void Epoll1EventHandle::SetHasError() { error_closure_->SetReady(); } // Polls the registered Fds for events until timeout is reached or there is a -// Kick(). If there is a Kick(), it returns any previously un-processed events. -// If there are no un-processed events, it returns Poller::WorkResult::Kicked{} -Poller::WorkResult Epoll1Poller::Work(EventEngine::Duration timeout) { - Poller::Events pending_events; +// Kick(). If there is a Kick(), it collects and processes any previously +// un-processed events. If there are no un-processed events, it returns +// Poller::WorkResult::Kicked{} +Poller::WorkResult Epoll1Poller::Work( + EventEngine::Duration timeout, + absl::FunctionRef schedule_poll_again) { + Events pending_events; if (g_epoll_set_.cursor == g_epoll_set_.num_events) { if (DoEpollWait(timeout) == 0) { - return Poller::DeadlineExceeded{}; + return Poller::WorkResult::kDeadlineExceeded; } } { @@ -540,10 +535,16 @@ Poller::WorkResult Epoll1Poller::Work(EventEngine::Duration timeout) { was_kicked_ = false; } if (pending_events.empty()) { - return Poller::Kicked{}; + return Poller::WorkResult::kKicked; } - return pending_events; } + // Run the provided callback. + schedule_poll_again(); + // Process all pending events inline. + for (auto& it : pending_events) { + it->ExecutePendingActions(); + } + return Poller::WorkResult::kOk; } void Epoll1Poller::Kick() { @@ -589,7 +590,7 @@ EventHandle* Epoll1Poller::CreateHandle(int /*fd*/, absl::string_view /*name*/, } bool Epoll1Poller::ProcessEpollEvents(int /*max_epoll_events_to_handle*/, - Poller::Events& /*pending_events*/) { + Events& /*pending_events*/) { GPR_ASSERT(false && "unimplemented"); } @@ -597,7 +598,9 @@ int Epoll1Poller::DoEpollWait(EventEngine::Duration /*timeout*/) { GPR_ASSERT(false && "unimplemented"); } -Poller::WorkResult Epoll1Poller::Work(EventEngine::Duration /*timeout*/) { +Poller::WorkResult Epoll1Poller::Work( + EventEngine::Duration /*timeout*/, + absl::FunctionRef /*schedule_poll_again*/) { GPR_ASSERT(false && "unimplemented"); } diff --git a/src/core/lib/event_engine/posix_engine/ev_epoll1_linux.h b/src/core/lib/event_engine/posix_engine/ev_epoll1_linux.h index 582fc8d48e1..d1becd525c1 100644 --- a/src/core/lib/event_engine/posix_engine/ev_epoll1_linux.h +++ b/src/core/lib/event_engine/posix_engine/ev_epoll1_linux.h @@ -21,6 +21,8 @@ #include #include "absl/base/thread_annotations.h" +#include "absl/container/inlined_vector.h" +#include "absl/functional/function_ref.h" #include "absl/strings/string_view.h" #include "absl/synchronization/mutex.h" @@ -49,7 +51,8 @@ class Epoll1Poller : public PosixEventPoller { EventHandle* CreateHandle(int fd, absl::string_view name, bool track_err) override; Poller::WorkResult Work( - grpc_event_engine::experimental::EventEngine::Duration timeout) override; + grpc_event_engine::experimental::EventEngine::Duration timeout, + absl::FunctionRef schedule_poll_again) override; std::string Name() override { return "epoll1"; } void Kick() override; Scheduler* GetScheduler() { return scheduler_; } @@ -57,6 +60,8 @@ class Epoll1Poller : public PosixEventPoller { ~Epoll1Poller() override; private: + // This initial vector size may need to be tuned + using Events = absl::InlinedVector; // Process the epoll events found by DoEpollWait() function. // - g_epoll_set.cursor points to the index of the first event to be processed // - This function then processes up-to max_epoll_events_to_handle and @@ -65,7 +70,7 @@ class Epoll1Poller : public PosixEventPoller { // function. It also returns the list of closures to run to take action // on file descriptors that became readable/writable. bool ProcessEpollEvents(int max_epoll_events_to_handle, - Poller::Events& pending_events); + Events& pending_events); // Do epoll_wait and store the events in g_epoll_set.events field. This does // not "process" any of the events yet; that is done in ProcessEpollEvents(). // See ProcessEpollEvents() function for more details. It returns the number diff --git a/src/core/lib/event_engine/posix_engine/ev_poll_posix.cc b/src/core/lib/event_engine/posix_engine/ev_poll_posix.cc index d1be9b16999..db6256ea46a 100644 --- a/src/core/lib/event_engine/posix_engine/ev_poll_posix.cc +++ b/src/core/lib/event_engine/posix_engine/ev_poll_posix.cc @@ -24,6 +24,7 @@ #include #include +#include "absl/container/inlined_vector.h" #include "absl/functional/any_invocable.h" #include "absl/status/status.h" #include "absl/status/statusor.h" @@ -75,6 +76,7 @@ using ::grpc_event_engine::experimental::AnyInvocableClosure; using ::grpc_event_engine::experimental::EventEngine; using ::grpc_event_engine::experimental::Poller; using ::grpc_event_engine::posix_engine::WakeupFd; +using Events = absl::InlinedVector; class PollEventHandle : public EventHandle { public: @@ -102,8 +104,7 @@ class PollEventHandle : public EventHandle { poller_->PollerHandlesListAddHandle(this); } PollPoller* Poller() { return poller_; } - EventEngine::Closure* SetPendingActions(bool pending_read, - bool pending_write) { + bool SetPendingActions(bool pending_read, bool pending_write) { pending_actions_ |= pending_read; if (pending_write) { pending_actions_ |= (1 << 2); @@ -112,9 +113,9 @@ class PollEventHandle : public EventHandle { // The closure is going to be executed. We'll Unref this handle in // ExecutePendingActions. Ref(); - return &exec_actions_closure_; + return true; } - return nullptr; + return false; } void ForceRemoveHandleFromPoller() { absl::MutexLock lock(&poller_->mu_); @@ -202,7 +203,7 @@ class PollEventHandle : public EventHandle { } uint32_t BeginPollLocked(uint32_t read_mask, uint32_t write_mask) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); - EventEngine::Closure* EndPollLocked(int got_read, int got_write) + bool EndPollLocked(bool got_read, bool got_write) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); private: @@ -559,15 +560,13 @@ uint32_t PollEventHandle::BeginPollLocked(uint32_t read_mask, return mask; } -EventEngine::Closure* PollEventHandle::EndPollLocked(int got_read, - int got_write) { - EventEngine::Closure* closure = nullptr; +bool PollEventHandle::EndPollLocked(bool got_read, bool got_write) { if (is_orphaned_ && !IsWatched()) { CloseFd(); } else if (!is_orphaned_) { - closure = SetPendingActions(got_read, got_write); + return SetPendingActions(got_read, got_write); } - return closure; + return false; } void PollPoller::KickExternal(bool ext) { @@ -641,13 +640,15 @@ PollPoller::~PollPoller() { GPR_ASSERT(poll_handles_list_head_ == nullptr); } -Poller::WorkResult PollPoller::Work(EventEngine::Duration timeout) { +Poller::WorkResult PollPoller::Work( + EventEngine::Duration timeout, + absl::FunctionRef schedule_poll_again) { // Avoid malloc for small number of elements. enum { inline_elements = 96 }; struct pollfd pollfd_space[inline_elements]; bool was_kicked_ext = false; PollEventHandle* watcher_space[inline_elements]; - Poller::Events pending_events; + Events pending_events; int timeout_ms = static_cast(grpc_event_engine::experimental::Milliseconds(timeout)); mu_.Lock(); @@ -733,25 +734,25 @@ Poller::WorkResult PollPoller::Work(EventEngine::Duration timeout) { // This case implies the fd was polled (since watch_mask > 0 and // the poll returned an error. Mark the fds as both readable and // writable. - if (EventEngine::Closure* closure = head->EndPollLocked(1, 1)) { + if (head->EndPollLocked(true, true)) { // Its safe to add to list of pending events because - // EndPollLocked returns a +ve number only when the handle is + // EndPollLocked returns true only when the handle is // not orphaned. But an orphan might be initiated on the handle // after this Work() method returns and before the next Work() // method is invoked. - pending_events.push_back(closure); + pending_events.push_back(head); } } else { // In this case, (1) watch_mask > 0 && r == 0 or (2) watch_mask == // 0 and r < 0 or (3) watch_mask == 0 and r == 0. For case-1, no // events are pending on the fd even though the fd was polled. For // case-2 and 3, the fd was not polled - head->EndPollLocked(0, 0); + head->EndPollLocked(false, false); } } else { // It can enter this case if an orphan was invoked on the handle // while it was being polled. - head->EndPollLocked(0, 0); + head->EndPollLocked(false, false); } lock.Release(); // Unref the ref taken at BeginPollLocked. @@ -770,22 +771,21 @@ Poller::WorkResult PollPoller::Work(EventEngine::Duration timeout) { // handle while it was being polled. If watch_mask is 0, then the fd // was not polled. head->SetWatched(-1); - head->EndPollLocked(0, 0); + head->EndPollLocked(false, false); } else { // Watched is true and watch_mask > 0 if (pfds[i].revents & POLLHUP) { head->SetPollhup(true); } head->SetWatched(-1); - if (EventEngine::Closure* closure = - head->EndPollLocked(pfds[i].revents & kPollinCheck, - pfds[i].revents & kPolloutCheck)) { + if (head->EndPollLocked(pfds[i].revents & kPollinCheck, + pfds[i].revents & kPolloutCheck)) { // Its safe to add to list of pending events because EndPollLocked - // returns a +ve number only when the handle is not orphaned. + // returns true only when the handle is not orphaned. // But an orphan might be initiated on the handle after this // Work() method returns and before the next Work() method is // invoked. - pending_events.push_back(closure); + pending_events.push_back(head); } } lock.Release(); @@ -811,11 +811,17 @@ Poller::WorkResult PollPoller::Work(EventEngine::Duration timeout) { mu_.Unlock(); if (pending_events.empty()) { if (was_kicked_ext) { - return Poller::Kicked{}; + return Poller::WorkResult::kKicked; } - return Poller::DeadlineExceeded{}; + return Poller::WorkResult::kDeadlineExceeded; + } + // Run the provided callback synchronously. + schedule_poll_again(); + // Process all pending events inline. + for (auto& it : pending_events) { + it->ExecutePendingActions(); } - return pending_events; + return Poller::WorkResult::kOk; } void PollPoller::Shutdown() { @@ -855,7 +861,9 @@ EventHandle* PollPoller::CreateHandle(int /*fd*/, absl::string_view /*name*/, GPR_ASSERT(false && "unimplemented"); } -Poller::WorkResult PollPoller::Work(EventEngine::Duration /*timeout*/) { +Poller::WorkResult PollPoller::Work( + EventEngine::Duration /*timeout*/, + absl::FunctionRef /*schedule_poll_again*/) { GPR_ASSERT(false && "unimplemented"); } diff --git a/src/core/lib/event_engine/posix_engine/ev_poll_posix.h b/src/core/lib/event_engine/posix_engine/ev_poll_posix.h index 0b212707861..53557d9d96f 100644 --- a/src/core/lib/event_engine/posix_engine/ev_poll_posix.h +++ b/src/core/lib/event_engine/posix_engine/ev_poll_posix.h @@ -22,6 +22,7 @@ #include #include "absl/base/thread_annotations.h" +#include "absl/functional/function_ref.h" #include "absl/strings/string_view.h" #include "absl/synchronization/mutex.h" @@ -44,7 +45,8 @@ class PollPoller : public PosixEventPoller { EventHandle* CreateHandle(int fd, absl::string_view name, bool track_err) override; Poller::WorkResult Work( - grpc_event_engine::experimental::EventEngine::Duration timeout) override; + grpc_event_engine::experimental::EventEngine::Duration timeout, + absl::FunctionRef schedule_poll_again) override; std::string Name() override { return "poll"; } void Kick() override; Scheduler* GetScheduler() { return scheduler_; } diff --git a/src/core/lib/event_engine/windows/iocp.cc b/src/core/lib/event_engine/windows/iocp.cc index adbc57ae4b7..9222c68fd89 100644 --- a/src/core/lib/event_engine/windows/iocp.cc +++ b/src/core/lib/event_engine/windows/iocp.cc @@ -59,12 +59,13 @@ WinSocket* IOCP::Watch(SOCKET socket) { void IOCP::Shutdown() { while (outstanding_kicks_.load() > 0) { - Work(std::chrono::hours(42)); + Work(std::chrono::hours(42), []() {}); } GPR_ASSERT(CloseHandle(iocp_handle_)); } -Poller::WorkResult IOCP::Work(EventEngine::Duration timeout) { +Poller::WorkResult IOCP::Work(EventEngine::Duration timeout, + absl::FunctionRef schedule_poll_again) { static const absl::Status kDeadlineExceeded = absl::DeadlineExceededError( absl::StrFormat("IOCP::%p: Received no completions", this)); static const absl::Status kKicked = @@ -82,7 +83,7 @@ Poller::WorkResult IOCP::Work(EventEngine::Duration timeout) { if (GRPC_TRACE_FLAG_ENABLED(grpc_event_engine_trace)) { gpr_log(GPR_DEBUG, "IOCP::%p deadline exceeded", this); } - return Poller::DeadlineExceeded{}; + return Poller::WorkResult::kDeadlineExceeded; } GPR_ASSERT(completion_key && overlapped); if (overlapped == &kick_overlap_) { @@ -91,7 +92,7 @@ Poller::WorkResult IOCP::Work(EventEngine::Duration timeout) { } outstanding_kicks_.fetch_sub(1); if (completion_key == (ULONG_PTR)&kick_token_) { - return Poller::Kicked{}; + return Poller::WorkResult::kKicked; } gpr_log(GPR_ERROR, "Unknown custom completion key: %p", completion_key); abort(); @@ -109,10 +110,15 @@ Poller::WorkResult IOCP::Work(EventEngine::Duration timeout) { } else { info->GetOverlappedResult(); } - if (info->closure() != nullptr) return Events{info->closure()}; + if (info->closure() != nullptr) { + schedule_poll_again(); + executor_->Run(info->closure()); + return Poller::WorkResult::kOk; + } // No callback registered. Set ready and return an empty set info->SetReady(); - return Events{}; + schedule_poll_again(); + return Poller::WorkResult::kOk; } void IOCP::Kick() { diff --git a/src/core/lib/event_engine/windows/iocp.h b/src/core/lib/event_engine/windows/iocp.h index f13ec6dfe9b..57a33185f88 100644 --- a/src/core/lib/event_engine/windows/iocp.h +++ b/src/core/lib/event_engine/windows/iocp.h @@ -42,7 +42,8 @@ class IOCP final : public Poller { // interface methods void Shutdown(); - WorkResult Work(EventEngine::Duration timeout) override; + WorkResult Work(EventEngine::Duration timeout, + absl::FunctionRef schedule_poll_again) override; void Kick() override; WinSocket* Watch(SOCKET socket); diff --git a/test/core/event_engine/posix/event_poller_posix_test.cc b/test/core/event_engine/posix/event_poller_posix_test.cc index 77a6338bbd0..6e5f9ad9800 100644 --- a/test/core/event_engine/posix/event_poller_posix_test.cc +++ b/test/core/event_engine/posix/event_poller_posix_test.cc @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include #include #include "absl/functional/any_invocable.h" @@ -245,6 +246,9 @@ void ListenCb(server* sv, absl::Status status) { [sv](absl::Status status) { ListenCb(sv, status); }); listen_em_fd->NotifyOnRead(sv->listen_closure); return; + } else if (fd < 0) { + gpr_log(GPR_ERROR, "Failed to acceot a connection, returned error: %s", + std::strerror(errno)); } EXPECT_GE(fd, 0); EXPECT_LT(fd, FD_SETSIZE); @@ -376,14 +380,8 @@ void WaitAndShutdown(server* sv, client* cl) { gpr_mu_lock(&g_mu); while (!sv->done || !cl->done) { gpr_mu_unlock(&g_mu); - result = g_event_poller->Work(24h); - if (absl::holds_alternative(result)) { - auto pending_events = absl::get(result); - for (auto it = pending_events.begin(); it != pending_events.end(); ++it) { - (*it)->Run(); - } - pending_events.clear(); - } + result = g_event_poller->Work(24h, []() {}); + ASSERT_FALSE(result == Poller::WorkResult::kDeadlineExceeded); gpr_mu_lock(&g_mu); } gpr_mu_unlock(&g_mu); @@ -507,15 +505,8 @@ TEST_P(EventPollerTest, TestEventPollerHandleChange) { gpr_mu_lock(&g_mu); while (fdc->cb_that_ran == nullptr) { gpr_mu_unlock(&g_mu); - result = g_event_poller->Work(24h); - if (absl::holds_alternative(result)) { - auto pending_events = absl::get(result); - for (auto it = pending_events.begin(); it != pending_events.end(); - ++it) { - (*it)->Run(); - } - pending_events.clear(); - } + result = g_event_poller->Work(24h, []() {}); + ASSERT_FALSE(result == Poller::WorkResult::kDeadlineExceeded); gpr_mu_lock(&g_mu); } }; @@ -686,26 +677,19 @@ class Worker : public grpc_core::DualRefCounted { private: void Work() { - auto result = g_event_poller->Work(24h); - if (absl::holds_alternative(result)) { + auto result = g_event_poller->Work(24h, [this]() { // Schedule next work instantiation immediately and take a Ref for // the next instantiation. Ref().release(); scheduler_->Run([this]() { Work(); }); - // Process pending events of current Work(..) instantiation. - auto pending_events = absl::get(result); - for (auto it = pending_events.begin(); it != pending_events.end(); ++it) { - (*it)->Run(); - } - pending_events.clear(); - // Corresponds to the Ref taken for the current instantiation. - Unref(); - } else { - // The poller got kicked. This can only happen when all the Fds have - // orphaned themselves. - EXPECT_TRUE(absl::holds_alternative(result)); - Unref(); - } + }); + ASSERT_TRUE(result == Poller::WorkResult::kOk || + result == Poller::WorkResult::kKicked); + // Corresponds to the Ref taken for the current instantiation. If the + // result was Poller::WorkResult::kKicked, then the next work instantiation + // would not have been scheduled and the poll_again callback should have + // been deleted. + Unref(); } Scheduler* scheduler_; PosixEventPoller* poller_; diff --git a/test/core/event_engine/windows/iocp_test.cc b/test/core/event_engine/windows/iocp_test.cc index 13f2c308ff8..86c4a094273 100644 --- a/test/core/event_engine/windows/iocp_test.cc +++ b/test/core/event_engine/windows/iocp_test.cc @@ -114,17 +114,17 @@ TEST_F(IOCPTest, ClientReceivesNotificationOfServerSend) { wrapped_server_socket->NotifyOnWrite(on_write); } // Doing work for WSASend - auto work_result = iocp.Work(std::chrono::seconds(10)); - ASSERT_TRUE(absl::holds_alternative(work_result)); - Poller::Events closures = absl::get(work_result); - ASSERT_EQ(closures.size(), 1); - executor.Run(closures[0]); + bool cb_invoked = false; + auto work_result = iocp.Work(std::chrono::seconds(10), + [&cb_invoked]() { cb_invoked = true; }); + ASSERT_TRUE(work_result == Poller::WorkResult::kOk); + ASSERT_TRUE(cb_invoked); // Doing work for WSARecv - work_result = iocp.Work(std::chrono::seconds(10)); - ASSERT_TRUE(absl::holds_alternative(work_result)); - closures = absl::get(work_result); - ASSERT_EQ(closures.size(), 1); - executor.Run(closures[0]); + cb_invoked = false; + work_result = iocp.Work(std::chrono::seconds(10), + [&cb_invoked]() { cb_invoked = true; }); + ASSERT_TRUE(work_result == Poller::WorkResult::kOk); + ASSERT_TRUE(cb_invoked); // wait for the callbacks to run ASSERT_TRUE(read_called.Get()); ASSERT_TRUE(write_called.Get()); @@ -185,11 +185,12 @@ TEST_F(IOCPTest, IocpWorkTimeoutDueToNoNotificationRegistered) { &write_overlapped, NULL); EXPECT_EQ(status, 0); } - // IOCP::Work without any notification callbacks should return no Events. - auto work_result = iocp.Work(std::chrono::seconds(2)); - ASSERT_TRUE(absl::holds_alternative(work_result)); - Poller::Events closures = absl::get(work_result); - ASSERT_EQ(closures.size(), 0); + // IOCP::Work without any notification callbacks should still return Ok. + bool cb_invoked = false; + auto work_result = iocp.Work(std::chrono::seconds(2), + [&cb_invoked]() { cb_invoked = true; }); + ASSERT_TRUE(work_result == Poller::WorkResult::kOk); + ASSERT_TRUE(cb_invoked); // register the closure, which should trigger it immediately. wrapped_client_socket->NotifyOnRead(on_read); // wait for the callbacks to run @@ -205,8 +206,11 @@ TEST_F(IOCPTest, KickWorks) { IOCP iocp(&executor); Promise kicked{false}; executor.Run([&iocp, &kicked] { - Poller::WorkResult result = iocp.Work(std::chrono::seconds(30)); - ASSERT_TRUE(absl::holds_alternative(result)); + bool cb_invoked = false; + Poller::WorkResult result = iocp.Work( + std::chrono::seconds(30), [&cb_invoked]() { cb_invoked = true; }); + ASSERT_TRUE(result == Poller::WorkResult::kKicked); + ASSERT_FALSE(cb_invoked); kicked.Set(true); }); executor.Run([&iocp] { @@ -227,14 +231,21 @@ TEST_F(IOCPTest, KickThenShutdownCasusesNextWorkerToBeKicked) { // kick twice iocp.Kick(); iocp.Kick(); + bool cb_invoked = false; // Assert the next two WorkResults are kicks - auto result = iocp.Work(std::chrono::milliseconds(1)); - ASSERT_TRUE(absl::holds_alternative(result)); - result = iocp.Work(std::chrono::milliseconds(1)); - ASSERT_TRUE(absl::holds_alternative(result)); + auto result = iocp.Work(std::chrono::milliseconds(1), + [&cb_invoked]() { cb_invoked = true; }); + ASSERT_TRUE(result == Poller::WorkResult::kKicked); + ASSERT_FALSE(cb_invoked); + result = iocp.Work(std::chrono::milliseconds(1), + [&cb_invoked]() { cb_invoked = true; }); + ASSERT_TRUE(result == Poller::WorkResult::kKicked); + ASSERT_FALSE(cb_invoked); // followed by a DeadlineExceeded - result = iocp.Work(std::chrono::milliseconds(1)); - ASSERT_TRUE(absl::holds_alternative(result)); + result = iocp.Work(std::chrono::milliseconds(1), + [&cb_invoked]() { cb_invoked = true; }); + ASSERT_TRUE(result == Poller::WorkResult::kDeadlineExceeded); + ASSERT_FALSE(cb_invoked); } TEST_F(IOCPTest, CrashOnWatchingAClosedSocket) { @@ -270,13 +281,8 @@ TEST_F(IOCPTest, StressTestThousandsOfSockets) { std::thread iocp_worker([&iocp, &executor] { Poller::WorkResult result; do { - result = iocp.Work(std::chrono::seconds(1)); - if (absl::holds_alternative(result)) { - for (auto& event : absl::get(result)) { - executor.Run(event); - } - } - } while (!absl::holds_alternative(result)); + result = iocp.Work(std::chrono::seconds(1), []() {}); + } while (result != Poller::WorkResult::kDeadlineExceeded); }); for (int i = 0; i < sockets_per_thread; i++) { SOCKET sockpair[2];