From 7646308547ef90f15445c85e1cefcf1e8ab44d4b Mon Sep 17 00:00:00 2001 From: Vignesh Babu Date: Sat, 18 Mar 2023 04:32:26 +0000 Subject: [PATCH] [EventEngine] Return correct value EventEngine::IsWorkerThread and use this value in completion queue handling to correctly offload work (#32637) --- src/core/lib/event_engine/cf_engine/cf_engine.cc | 2 +- src/core/lib/event_engine/posix_engine/ev_epoll1_linux.cc | 2 +- src/core/lib/event_engine/posix_engine/posix_engine.cc | 4 +++- src/core/lib/event_engine/windows/windows_engine.cc | 4 +++- src/core/lib/surface/completion_queue.cc | 8 +++++++- 5 files changed, 15 insertions(+), 5 deletions(-) diff --git a/src/core/lib/event_engine/cf_engine/cf_engine.cc b/src/core/lib/event_engine/cf_engine/cf_engine.cc index 4a33d84d005..196488f1906 100644 --- a/src/core/lib/event_engine/cf_engine/cf_engine.cc +++ b/src/core/lib/event_engine/cf_engine/cf_engine.cc @@ -82,7 +82,7 @@ bool CFEventEngine::CancelConnect(ConnectionHandle /* handle */) { grpc_core::Crash("unimplemented"); } -bool CFEventEngine::IsWorkerThread() { grpc_core::Crash("unimplemented"); } +bool CFEventEngine::IsWorkerThread() { return executor_->IsThreadPoolThread(); } std::unique_ptr CFEventEngine::GetDNSResolver( const DNSResolver::ResolverOptions& /* options */) { diff --git a/src/core/lib/event_engine/posix_engine/ev_epoll1_linux.cc b/src/core/lib/event_engine/posix_engine/ev_epoll1_linux.cc index 0d117229424..b66029b247b 100644 --- a/src/core/lib/event_engine/posix_engine/ev_epoll1_linux.cc +++ b/src/core/lib/event_engine/posix_engine/ev_epoll1_linux.cc @@ -349,7 +349,7 @@ void Epoll1EventHandle::HandleShutdownInternal(absl::Status why, } } write_closure_->SetShutdown(why); - write_closure_->SetShutdown(why); + error_closure_->SetShutdown(why); } } diff --git a/src/core/lib/event_engine/posix_engine/posix_engine.cc b/src/core/lib/event_engine/posix_engine/posix_engine.cc index 31a7cf6ed6f..995a8a2fddb 100644 --- a/src/core/lib/event_engine/posix_engine/posix_engine.cc +++ b/src/core/lib/event_engine/posix_engine/posix_engine.cc @@ -487,7 +487,9 @@ std::unique_ptr PosixEventEngine::GetDNSResolver( grpc_core::Crash("unimplemented"); } -bool PosixEventEngine::IsWorkerThread() { grpc_core::Crash("unimplemented"); } +bool PosixEventEngine::IsWorkerThread() { + return executor_->IsThreadPoolThread(); +} bool PosixEventEngine::CancelConnect(EventEngine::ConnectionHandle handle) { #ifdef GRPC_POSIX_SOCKET_TCP diff --git a/src/core/lib/event_engine/windows/windows_engine.cc b/src/core/lib/event_engine/windows/windows_engine.cc index 86167ee255f..6dd3f10b67e 100644 --- a/src/core/lib/event_engine/windows/windows_engine.cc +++ b/src/core/lib/event_engine/windows/windows_engine.cc @@ -181,7 +181,9 @@ std::unique_ptr WindowsEventEngine::GetDNSResolver( grpc_core::Crash("unimplemented"); } -bool WindowsEventEngine::IsWorkerThread() { grpc_core::Crash("unimplemented"); } +bool WindowsEventEngine::IsWorkerThread() { + return executor_->IsThreadPoolThread(); +} void WindowsEventEngine::OnConnectCompleted( std::shared_ptr state) { diff --git a/src/core/lib/surface/completion_queue.cc b/src/core/lib/surface/completion_queue.cc index aaa0caf811b..c68a6039d2f 100644 --- a/src/core/lib/surface/completion_queue.cc +++ b/src/core/lib/surface/completion_queue.cc @@ -25,6 +25,7 @@ #include #include #include +#include #include #include #include @@ -34,6 +35,7 @@ #include "absl/strings/str_format.h" #include "absl/strings/str_join.h" +#include #include #include #include @@ -43,6 +45,7 @@ #include "src/core/lib/debug/stats.h" #include "src/core/lib/debug/stats_data.h" +#include "src/core/lib/event_engine/default_event_engine.h" #include "src/core/lib/gpr/spinlock.h" #include "src/core/lib/gprpp/atomic_utils.h" #include "src/core/lib/gprpp/debug_location.h" @@ -858,8 +861,11 @@ static void cq_end_op_for_callback( // 2. The callback is marked inlineable and there is an ACEC available // 3. We are already running in a background poller thread (which always has // an ACEC available at the base of the stack). + // 4. We are running in an event engine thread with ACEC available. auto* functor = static_cast(tag); - if (((internal || functor->inlineable) && + if (((internal || functor->inlineable || + grpc_event_engine::experimental::GetDefaultEventEngine() + ->IsWorkerThread()) && grpc_core::ApplicationCallbackExecCtx::Available()) || grpc_iomgr_is_any_background_poller_thread()) { grpc_core::ApplicationCallbackExecCtx::Enqueue(functor, (error.ok()));