Revert "[EventEngine] Return correct value EventEngine::IsWorkerThread and use this value in completion queue handling to correctly offload work" (#32656)

Reverts grpc/grpc#32637

Apologies for not catching this before submission.

I continue to think that IsWorkerThread as an EventEngine member
function is a poor API choice: it implies that the thread is a thread
for the referenced event engine, and that's not what we're returning,
and also rarely useful.

If we want this concept in the API it should be a static function with a
way of registering a thread into the set. I however firmly believe we'll
do better long term if we abandon the concept of being able to identify
from outside who created a thread and instead solve problems in other
ways.
pull/32664/head
Craig Tiller 2 years ago committed by GitHub
parent 522bed8cc6
commit 91d64a3774
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      src/core/lib/event_engine/cf_engine/cf_engine.cc
  2. 2
      src/core/lib/event_engine/posix_engine/ev_epoll1_linux.cc
  3. 4
      src/core/lib/event_engine/posix_engine/posix_engine.cc
  4. 4
      src/core/lib/event_engine/windows/windows_engine.cc
  5. 8
      src/core/lib/surface/completion_queue.cc

@ -82,7 +82,7 @@ bool CFEventEngine::CancelConnect(ConnectionHandle /* handle */) {
grpc_core::Crash("unimplemented"); grpc_core::Crash("unimplemented");
} }
bool CFEventEngine::IsWorkerThread() { return executor_->IsThreadPoolThread(); } bool CFEventEngine::IsWorkerThread() { grpc_core::Crash("unimplemented"); }
std::unique_ptr<EventEngine::DNSResolver> CFEventEngine::GetDNSResolver( std::unique_ptr<EventEngine::DNSResolver> CFEventEngine::GetDNSResolver(
const DNSResolver::ResolverOptions& /* options */) { const DNSResolver::ResolverOptions& /* options */) {

@ -349,7 +349,7 @@ void Epoll1EventHandle::HandleShutdownInternal(absl::Status why,
} }
} }
write_closure_->SetShutdown(why); write_closure_->SetShutdown(why);
error_closure_->SetShutdown(why); write_closure_->SetShutdown(why);
} }
} }

@ -487,9 +487,7 @@ std::unique_ptr<EventEngine::DNSResolver> PosixEventEngine::GetDNSResolver(
grpc_core::Crash("unimplemented"); grpc_core::Crash("unimplemented");
} }
bool PosixEventEngine::IsWorkerThread() { bool PosixEventEngine::IsWorkerThread() { grpc_core::Crash("unimplemented"); }
return executor_->IsThreadPoolThread();
}
bool PosixEventEngine::CancelConnect(EventEngine::ConnectionHandle handle) { bool PosixEventEngine::CancelConnect(EventEngine::ConnectionHandle handle) {
#ifdef GRPC_POSIX_SOCKET_TCP #ifdef GRPC_POSIX_SOCKET_TCP

@ -181,9 +181,7 @@ std::unique_ptr<EventEngine::DNSResolver> WindowsEventEngine::GetDNSResolver(
grpc_core::Crash("unimplemented"); grpc_core::Crash("unimplemented");
} }
bool WindowsEventEngine::IsWorkerThread() { bool WindowsEventEngine::IsWorkerThread() { grpc_core::Crash("unimplemented"); }
return executor_->IsThreadPoolThread();
}
void WindowsEventEngine::OnConnectCompleted( void WindowsEventEngine::OnConnectCompleted(
std::shared_ptr<ConnectionState> state) { std::shared_ptr<ConnectionState> state) {

@ -25,7 +25,6 @@
#include <algorithm> #include <algorithm>
#include <atomic> #include <atomic>
#include <initializer_list> #include <initializer_list>
#include <memory>
#include <new> #include <new>
#include <string> #include <string>
#include <utility> #include <utility>
@ -35,7 +34,6 @@
#include "absl/strings/str_format.h" #include "absl/strings/str_format.h"
#include "absl/strings/str_join.h" #include "absl/strings/str_join.h"
#include <grpc/event_engine/event_engine.h>
#include <grpc/grpc.h> #include <grpc/grpc.h>
#include <grpc/support/alloc.h> #include <grpc/support/alloc.h>
#include <grpc/support/atm.h> #include <grpc/support/atm.h>
@ -45,7 +43,6 @@
#include "src/core/lib/debug/stats.h" #include "src/core/lib/debug/stats.h"
#include "src/core/lib/debug/stats_data.h" #include "src/core/lib/debug/stats_data.h"
#include "src/core/lib/event_engine/default_event_engine.h"
#include "src/core/lib/gpr/spinlock.h" #include "src/core/lib/gpr/spinlock.h"
#include "src/core/lib/gprpp/atomic_utils.h" #include "src/core/lib/gprpp/atomic_utils.h"
#include "src/core/lib/gprpp/debug_location.h" #include "src/core/lib/gprpp/debug_location.h"
@ -861,11 +858,8 @@ static void cq_end_op_for_callback(
// 2. The callback is marked inlineable and there is an ACEC available // 2. The callback is marked inlineable and there is an ACEC available
// 3. We are already running in a background poller thread (which always has // 3. We are already running in a background poller thread (which always has
// an ACEC available at the base of the stack). // an ACEC available at the base of the stack).
// 4. We are running in an event engine thread with ACEC available.
auto* functor = static_cast<grpc_completion_queue_functor*>(tag); auto* functor = static_cast<grpc_completion_queue_functor*>(tag);
if (((internal || functor->inlineable || if (((internal || functor->inlineable) &&
grpc_event_engine::experimental::GetDefaultEventEngine()
->IsWorkerThread()) &&
grpc_core::ApplicationCallbackExecCtx::Available()) || grpc_core::ApplicationCallbackExecCtx::Available()) ||
grpc_iomgr_is_any_background_poller_thread()) { grpc_iomgr_is_any_background_poller_thread()) {
grpc_core::ApplicationCallbackExecCtx::Enqueue(functor, (error.ok())); grpc_core::ApplicationCallbackExecCtx::Enqueue(functor, (error.ok()));

Loading…
Cancel
Save