[EventEngine] Return correct value EventEngine::IsWorkerThread and use this value in completion queue handling to correctly offload work (#32637)

pull/32648/merge
Vignesh Babu 2 years ago committed by GitHub
parent 4c5771d27c
commit 7646308547
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      src/core/lib/event_engine/cf_engine/cf_engine.cc
  2. 2
      src/core/lib/event_engine/posix_engine/ev_epoll1_linux.cc
  3. 4
      src/core/lib/event_engine/posix_engine/posix_engine.cc
  4. 4
      src/core/lib/event_engine/windows/windows_engine.cc
  5. 8
      src/core/lib/surface/completion_queue.cc

@ -82,7 +82,7 @@ bool CFEventEngine::CancelConnect(ConnectionHandle /* handle */) {
grpc_core::Crash("unimplemented");
}
bool CFEventEngine::IsWorkerThread() { grpc_core::Crash("unimplemented"); }
bool CFEventEngine::IsWorkerThread() { return executor_->IsThreadPoolThread(); }
std::unique_ptr<EventEngine::DNSResolver> CFEventEngine::GetDNSResolver(
const DNSResolver::ResolverOptions& /* options */) {

@ -349,7 +349,7 @@ void Epoll1EventHandle::HandleShutdownInternal(absl::Status why,
}
}
write_closure_->SetShutdown(why);
write_closure_->SetShutdown(why);
error_closure_->SetShutdown(why);
}
}

@ -487,7 +487,9 @@ std::unique_ptr<EventEngine::DNSResolver> PosixEventEngine::GetDNSResolver(
grpc_core::Crash("unimplemented");
}
bool PosixEventEngine::IsWorkerThread() { grpc_core::Crash("unimplemented"); }
bool PosixEventEngine::IsWorkerThread() {
return executor_->IsThreadPoolThread();
}
bool PosixEventEngine::CancelConnect(EventEngine::ConnectionHandle handle) {
#ifdef GRPC_POSIX_SOCKET_TCP

@ -181,7 +181,9 @@ std::unique_ptr<EventEngine::DNSResolver> WindowsEventEngine::GetDNSResolver(
grpc_core::Crash("unimplemented");
}
bool WindowsEventEngine::IsWorkerThread() { grpc_core::Crash("unimplemented"); }
bool WindowsEventEngine::IsWorkerThread() {
return executor_->IsThreadPoolThread();
}
void WindowsEventEngine::OnConnectCompleted(
std::shared_ptr<ConnectionState> state) {

@ -25,6 +25,7 @@
#include <algorithm>
#include <atomic>
#include <initializer_list>
#include <memory>
#include <new>
#include <string>
#include <utility>
@ -34,6 +35,7 @@
#include "absl/strings/str_format.h"
#include "absl/strings/str_join.h"
#include <grpc/event_engine/event_engine.h>
#include <grpc/grpc.h>
#include <grpc/support/alloc.h>
#include <grpc/support/atm.h>
@ -43,6 +45,7 @@
#include "src/core/lib/debug/stats.h"
#include "src/core/lib/debug/stats_data.h"
#include "src/core/lib/event_engine/default_event_engine.h"
#include "src/core/lib/gpr/spinlock.h"
#include "src/core/lib/gprpp/atomic_utils.h"
#include "src/core/lib/gprpp/debug_location.h"
@ -858,8 +861,11 @@ static void cq_end_op_for_callback(
// 2. The callback is marked inlineable and there is an ACEC available
// 3. We are already running in a background poller thread (which always has
// an ACEC available at the base of the stack).
// 4. We are running in an event engine thread with ACEC available.
auto* functor = static_cast<grpc_completion_queue_functor*>(tag);
if (((internal || functor->inlineable) &&
if (((internal || functor->inlineable ||
grpc_event_engine::experimental::GetDefaultEventEngine()
->IsWorkerThread()) &&
grpc_core::ApplicationCallbackExecCtx::Available()) ||
grpc_iomgr_is_any_background_poller_thread()) {
grpc_core::ApplicationCallbackExecCtx::Enqueue(functor, (error.ok()));

Loading…
Cancel
Save