[EventEngine] Skip legacy fork handling for ExecCtx when in an EventEngine thread (#32229)

* [EventEngine] Implement EventEngine::IsWorkerThread()

* lighter-weight thread pool check

* add lightweight thread_local flag for EventEngine/iomgr fork

* generate_projects; add files

* fix

* back out EE implementation changes

* better description

* fix
pull/32272/head
AJ Heller 2 years ago committed by GitHub
parent 0893fca089
commit a9e2ef199e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      BUILD
  2. 1
      CMakeLists.txt
  3. 1
      Makefile
  4. 2
      build_autogenerated.yaml
  5. 1
      config.m4
  6. 1
      config.w32
  7. 2
      gRPC-C++.podspec
  8. 3
      gRPC-Core.podspec
  9. 2
      grpc.gemspec
  10. 1
      grpc.gyp
  11. 2
      package.xml
  12. 8
      src/core/BUILD
  13. 29
      src/core/lib/event_engine/thread_local.cc
  14. 32
      src/core/lib/event_engine/thread_local.h
  15. 17
      src/core/lib/event_engine/thread_pool.cc
  16. 3
      src/core/lib/event_engine/thread_pool.h
  17. 7
      src/core/lib/gprpp/fork.cc
  18. 1
      src/python/grpcio/grpc_core_dependencies.py
  19. 2
      tools/doxygen/Doxyfile.c++.internal
  20. 2
      tools/doxygen/Doxyfile.core.internal

@ -719,6 +719,7 @@ grpc_cc_library(
"debug_location",
"//src/core:construct_destruct",
"//src/core:env",
"//src/core:event_engine_thread_local",
"//src/core:examine_stack",
"//src/core:gpr_atm",
"//src/core:no_destruct",

1
CMakeLists.txt generated

@ -1551,6 +1551,7 @@ target_link_libraries(end2end_tests
endif()
add_library(gpr
src/core/lib/event_engine/thread_local.cc
src/core/lib/gpr/alloc.cc
src/core/lib/gpr/atm.cc
src/core/lib/gpr/cpu_iphone.cc

1
Makefile generated

@ -840,6 +840,7 @@ endif
# start of build recipe for library "gpr" (generated by makelib(lib) template function)
LIBGPR_SRC = \
src/core/lib/event_engine/thread_local.cc \
src/core/lib/gpr/alloc.cc \
src/core/lib/gpr/atm.cc \
src/core/lib/gpr/cpu_iphone.cc \

@ -170,6 +170,7 @@ libs:
- include/grpc/support/thd_id.h
- include/grpc/support/time.h
headers:
- src/core/lib/event_engine/thread_local.h
- src/core/lib/gpr/alloc.h
- src/core/lib/gpr/string.h
- src/core/lib/gpr/time_precise.h
@ -196,6 +197,7 @@ libs:
- src/core/lib/gprpp/thd.h
- src/core/lib/gprpp/time_util.h
src:
- src/core/lib/event_engine/thread_local.cc
- src/core/lib/gpr/alloc.cc
- src/core/lib/gpr/atm.cc
- src/core/lib/gpr/cpu_iphone.cc

1
config.m4 generated

@ -532,6 +532,7 @@ if test "$PHP_GRPC" != "no"; then
src/core/lib/event_engine/slice.cc \
src/core/lib/event_engine/slice_buffer.cc \
src/core/lib/event_engine/tcp_socket_utils.cc \
src/core/lib/event_engine/thread_local.cc \
src/core/lib/event_engine/thread_pool.cc \
src/core/lib/event_engine/time_util.cc \
src/core/lib/event_engine/trace.cc \

1
config.w32 generated

@ -498,6 +498,7 @@ if (PHP_GRPC != "no") {
"src\\core\\lib\\event_engine\\slice.cc " +
"src\\core\\lib\\event_engine\\slice_buffer.cc " +
"src\\core\\lib\\event_engine\\tcp_socket_utils.cc " +
"src\\core\\lib\\event_engine\\thread_local.cc " +
"src\\core\\lib\\event_engine\\thread_pool.cc " +
"src\\core\\lib\\event_engine\\time_util.cc " +
"src\\core\\lib\\event_engine\\trace.cc " +

2
gRPC-C++.podspec generated

@ -766,6 +766,7 @@ Pod::Spec.new do |s|
'src/core/lib/event_engine/shim.h',
'src/core/lib/event_engine/socket_notifier.h',
'src/core/lib/event_engine/tcp_socket_utils.h',
'src/core/lib/event_engine/thread_local.h',
'src/core/lib/event_engine/thread_pool.h',
'src/core/lib/event_engine/time_util.h',
'src/core/lib/event_engine/trace.h',
@ -1698,6 +1699,7 @@ Pod::Spec.new do |s|
'src/core/lib/event_engine/shim.h',
'src/core/lib/event_engine/socket_notifier.h',
'src/core/lib/event_engine/tcp_socket_utils.h',
'src/core/lib/event_engine/thread_local.h',
'src/core/lib/event_engine/thread_pool.h',
'src/core/lib/event_engine/time_util.h',
'src/core/lib/event_engine/trace.h',

3
gRPC-Core.podspec generated

@ -1187,6 +1187,8 @@ Pod::Spec.new do |s|
'src/core/lib/event_engine/socket_notifier.h',
'src/core/lib/event_engine/tcp_socket_utils.cc',
'src/core/lib/event_engine/tcp_socket_utils.h',
'src/core/lib/event_engine/thread_local.cc',
'src/core/lib/event_engine/thread_local.h',
'src/core/lib/event_engine/thread_pool.cc',
'src/core/lib/event_engine/thread_pool.h',
'src/core/lib/event_engine/time_util.cc',
@ -2382,6 +2384,7 @@ Pod::Spec.new do |s|
'src/core/lib/event_engine/shim.h',
'src/core/lib/event_engine/socket_notifier.h',
'src/core/lib/event_engine/tcp_socket_utils.h',
'src/core/lib/event_engine/thread_local.h',
'src/core/lib/event_engine/thread_pool.h',
'src/core/lib/event_engine/time_util.h',
'src/core/lib/event_engine/trace.h',

2
grpc.gemspec generated

@ -1096,6 +1096,8 @@ Gem::Specification.new do |s|
s.files += %w( src/core/lib/event_engine/socket_notifier.h )
s.files += %w( src/core/lib/event_engine/tcp_socket_utils.cc )
s.files += %w( src/core/lib/event_engine/tcp_socket_utils.h )
s.files += %w( src/core/lib/event_engine/thread_local.cc )
s.files += %w( src/core/lib/event_engine/thread_local.h )
s.files += %w( src/core/lib/event_engine/thread_pool.cc )
s.files += %w( src/core/lib/event_engine/thread_pool.h )
s.files += %w( src/core/lib/event_engine/time_util.cc )

1
grpc.gyp generated

@ -305,6 +305,7 @@
'absl/types:variant',
],
'sources': [
'src/core/lib/event_engine/thread_local.cc',
'src/core/lib/gpr/alloc.cc',
'src/core/lib/gpr/atm.cc',
'src/core/lib/gpr/cpu_iphone.cc',

2
package.xml generated

@ -1078,6 +1078,8 @@
<file baseinstalldir="/" name="src/core/lib/event_engine/socket_notifier.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/event_engine/tcp_socket_utils.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/event_engine/tcp_socket_utils.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/event_engine/thread_local.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/event_engine/thread_local.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/event_engine/thread_pool.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/event_engine/thread_pool.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/event_engine/time_util.cc" role="src" />

@ -1471,6 +1471,13 @@ grpc_cc_library(
],
)
grpc_cc_library(
name = "event_engine_thread_local",
srcs = ["lib/event_engine/thread_local.cc"],
hdrs = ["lib/event_engine/thread_local.h"],
deps = ["//:gpr_platform"],
)
grpc_cc_library(
name = "event_engine_thread_pool",
srcs = ["lib/event_engine/thread_pool.cc"],
@ -1484,6 +1491,7 @@ grpc_cc_library(
],
deps = [
"event_engine_executor",
"event_engine_thread_local",
"forkable",
"time",
"useful",

@ -0,0 +1,29 @@
// Copyright 2023 The gRPC Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <grpc/support/port_platform.h>
#include "src/core/lib/event_engine/thread_local.h"
namespace grpc_event_engine {
namespace experimental {
namespace {
thread_local bool g_thread_local{false};
} // namespace
void ThreadLocal::SetIsEventEngineThread(bool is) { g_thread_local = is; }
bool ThreadLocal::IsEventEngineThread() { return g_thread_local; }
} // namespace experimental
} // namespace grpc_event_engine

@ -0,0 +1,32 @@
// Copyright 2023 The gRPC Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef GRPC_SRC_CORE_LIB_EVENT_ENGINE_THREAD_LOCAL_H
#define GRPC_SRC_CORE_LIB_EVENT_ENGINE_THREAD_LOCAL_H
#include <grpc/support/port_platform.h>
namespace grpc_event_engine {
namespace experimental {
/// A lightweight facility to allow gpr's fork handlers and
/// EventEngine::Forkables to coordinate.
class ThreadLocal {
public:
static void SetIsEventEngineThread(bool is_local);
static bool IsEventEngineThread();
};
} // namespace experimental
} // namespace grpc_event_engine
#endif // GRPC_SRC_CORE_LIB_EVENT_ENGINE_THREAD_LOCAL_H

@ -30,18 +30,13 @@
#include <grpc/support/log.h>
#include "src/core/lib/event_engine/thread_local.h"
#include "src/core/lib/gprpp/thd.h"
#include "src/core/lib/gprpp/time.h"
namespace grpc_event_engine {
namespace experimental {
namespace {
// TODO(drfloob): Remove this, and replace it with the WorkQueue* for the
// current thread (with nullptr indicating not a threadpool thread).
thread_local bool g_threadpool_thread;
} // namespace
void ThreadPool::StartThread(StatePtr state, StartThreadReason reason) {
state->thread_count.Add();
const auto now = grpc_core::Timestamp::Now();
@ -76,7 +71,7 @@ void ThreadPool::StartThread(StatePtr state, StartThreadReason reason) {
"event_engine",
[](void* arg) {
std::unique_ptr<ThreadArg> a(static_cast<ThreadArg*>(arg));
g_threadpool_thread = true;
ThreadLocal::SetIsEventEngineThread(true);
switch (a->reason) {
case StartThreadReason::kInitialPool:
break;
@ -148,14 +143,18 @@ ThreadPool::ThreadPool() {
}
}
bool ThreadPool::IsThreadPoolThread() {
return ThreadLocal::IsEventEngineThread();
}
void ThreadPool::Quiesce() {
state_->queue.SetShutdown();
// Wait until all threads are exited.
// Note that if this is a threadpool thread then we won't exit this thread
// until the callstack unwinds a little, so we need to wait for just one
// thread running instead of zero.
state_->thread_count.BlockUntilThreadCount(g_threadpool_thread ? 1 : 0,
"shutting down");
state_->thread_count.BlockUntilThreadCount(
ThreadLocal::IsEventEngineThread() ? 1 : 0, "shutting down");
quiesced_.store(true, std::memory_order_relaxed);
}

@ -59,6 +59,9 @@ class ThreadPool final : public Forkable, public Executor {
void PostforkParent() override;
void PostforkChild() override;
// Returns true if the current thread is a thread pool thread.
static bool IsThreadPoolThread();
private:
class Queue {
public:

@ -24,6 +24,7 @@
#include <grpc/support/sync.h>
#include <grpc/support/time.h>
#include "src/core/lib/event_engine/thread_local.h"
#include "src/core/lib/gprpp/global_config_env.h"
#include "src/core/lib/gprpp/no_destruct.h"
@ -63,6 +64,12 @@ class ExecCtxState {
}
void IncExecCtxCount() {
// EventEngine is expected to terminate all threads before fork, and so this
// extra work is unnecessary
if (grpc_event_engine::experimental::ThreadLocal::IsEventEngineThread()) {
gpr_atm_no_barrier_fetch_add(&count_, 1);
return;
}
gpr_atm count = gpr_atm_no_barrier_load(&count_);
while (true) {
if (count <= BLOCKED(1)) {

@ -507,6 +507,7 @@ CORE_SOURCE_FILES = [
'src/core/lib/event_engine/slice.cc',
'src/core/lib/event_engine/slice_buffer.cc',
'src/core/lib/event_engine/tcp_socket_utils.cc',
'src/core/lib/event_engine/thread_local.cc',
'src/core/lib/event_engine/thread_pool.cc',
'src/core/lib/event_engine/time_util.cc',
'src/core/lib/event_engine/trace.cc',

@ -2091,6 +2091,8 @@ src/core/lib/event_engine/slice_buffer.cc \
src/core/lib/event_engine/socket_notifier.h \
src/core/lib/event_engine/tcp_socket_utils.cc \
src/core/lib/event_engine/tcp_socket_utils.h \
src/core/lib/event_engine/thread_local.cc \
src/core/lib/event_engine/thread_local.h \
src/core/lib/event_engine/thread_pool.cc \
src/core/lib/event_engine/thread_pool.h \
src/core/lib/event_engine/time_util.cc \

@ -1869,6 +1869,8 @@ src/core/lib/event_engine/slice_buffer.cc \
src/core/lib/event_engine/socket_notifier.h \
src/core/lib/event_engine/tcp_socket_utils.cc \
src/core/lib/event_engine/tcp_socket_utils.h \
src/core/lib/event_engine/thread_local.cc \
src/core/lib/event_engine/thread_local.h \
src/core/lib/event_engine/thread_pool.cc \
src/core/lib/event_engine/thread_pool.h \
src/core/lib/event_engine/time_util.cc \

Loading…
Cancel
Save