EventEngine Forkables (#30473)

A (currently) pthread_atfork-based fork support mechanism, allowing EventEngines - or any other object that wants to implement the Forkable interface - respond to forks.
pull/30546/head
AJ Heller 2 years ago committed by GitHub
parent 5d0e744da6
commit 0c5b6171ad
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 28
      BUILD
  2. 38
      CMakeLists.txt
  3. 2
      Makefile
  4. 13
      build_autogenerated.yaml
  5. 1
      config.m4
  6. 1
      config.w32
  7. 2
      gRPC-C++.podspec
  8. 3
      gRPC-Core.podspec
  9. 2
      grpc.gemspec
  10. 2
      grpc.gyp
  11. 2
      package.xml
  12. 98
      src/core/lib/event_engine/forkable.cc
  13. 61
      src/core/lib/event_engine/forkable.h
  14. 2
      src/core/lib/event_engine/posix_engine/posix_engine.cc
  15. 2
      src/core/lib/event_engine/posix_engine/posix_engine.h
  16. 43
      src/core/lib/event_engine/posix_engine/timer_manager.cc
  17. 13
      src/core/lib/event_engine/posix_engine/timer_manager.h
  18. 43
      src/core/lib/event_engine/thread_pool.cc
  19. 14
      src/core/lib/event_engine/thread_pool.h
  20. 2
      src/core/lib/surface/init.cc
  21. 1
      src/python/grpcio/grpc_core_dependencies.py
  22. 15
      test/core/event_engine/BUILD
  23. 103
      test/core/event_engine/forkable_test.cc
  24. 2
      tools/doxygen/Doxyfile.c++.internal
  25. 2
      tools/doxygen/Doxyfile.core.internal
  26. 24
      tools/run_tests/generated/tests.json

28
BUILD

@ -417,6 +417,7 @@ grpc_cc_library(
"channel_stack_type",
"config",
"default_event_engine",
"forkable",
"gpr_base",
"grpc_authorization_base",
"grpc_base",
@ -480,6 +481,7 @@ grpc_cc_library(
"channel_stack_type",
"config",
"default_event_engine",
"forkable",
"gpr_base",
"grpc_authorization_base",
"grpc_base",
@ -2221,6 +2223,21 @@ grpc_cc_library(
deps = ["gpr_base"],
)
grpc_cc_library(
name = "forkable",
srcs = [
"src/core/lib/event_engine/forkable.cc",
],
hdrs = [
"src/core/lib/event_engine/forkable.h",
],
external_deps = ["absl/container:flat_hash_set"],
deps = [
"gpr_base",
"gpr_platform",
],
)
grpc_cc_library(
name = "event_engine_poller",
hdrs = [
@ -2314,8 +2331,14 @@ grpc_cc_library(
hdrs = [
"src/core/lib/event_engine/thread_pool.h",
],
external_deps = ["absl/functional:any_invocable"],
deps = ["gpr_base"],
external_deps = [
"absl/base:core_headers",
"absl/functional:any_invocable",
],
deps = [
"forkable",
"gpr_base",
],
)
grpc_cc_library(
@ -2332,6 +2355,7 @@ grpc_cc_library(
],
deps = [
"event_engine_base_hdrs",
"forkable",
"gpr_base",
"gpr_codegen",
"posix_event_engine_timer",

38
CMakeLists.txt generated

@ -959,6 +959,7 @@ if(gRPC_BUILD_TESTS)
if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
add_dependencies(buildtests_cxx fork_test)
endif()
add_dependencies(buildtests_cxx forkable_test)
add_dependencies(buildtests_cxx format_request_test)
add_dependencies(buildtests_cxx frame_handler_test)
if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_POSIX)
@ -2094,6 +2095,7 @@ add_library(grpc
src/core/lib/event_engine/default_event_engine.cc
src/core/lib/event_engine/default_event_engine_factory.cc
src/core/lib/event_engine/executor/threaded_executor.cc
src/core/lib/event_engine/forkable.cc
src/core/lib/event_engine/memory_allocator.cc
src/core/lib/event_engine/posix_engine/posix_engine.cc
src/core/lib/event_engine/posix_engine/timer.cc
@ -2704,6 +2706,7 @@ add_library(grpc_unsecure
src/core/lib/event_engine/default_event_engine.cc
src/core/lib/event_engine/default_event_engine_factory.cc
src/core/lib/event_engine/executor/threaded_executor.cc
src/core/lib/event_engine/forkable.cc
src/core/lib/event_engine/memory_allocator.cc
src/core/lib/event_engine/posix_engine/posix_engine.cc
src/core/lib/event_engine/posix_engine/timer.cc
@ -10162,6 +10165,41 @@ endif()
endif()
if(gRPC_BUILD_TESTS)
add_executable(forkable_test
test/core/event_engine/forkable_test.cc
third_party/googletest/googletest/src/gtest-all.cc
third_party/googletest/googlemock/src/gmock-all.cc
)
target_include_directories(forkable_test
PRIVATE
${CMAKE_CURRENT_SOURCE_DIR}
${CMAKE_CURRENT_SOURCE_DIR}/include
${_gRPC_ADDRESS_SORTING_INCLUDE_DIR}
${_gRPC_RE2_INCLUDE_DIR}
${_gRPC_SSL_INCLUDE_DIR}
${_gRPC_UPB_GENERATED_DIR}
${_gRPC_UPB_GRPC_GENERATED_DIR}
${_gRPC_UPB_INCLUDE_DIR}
${_gRPC_XXHASH_INCLUDE_DIR}
${_gRPC_ZLIB_INCLUDE_DIR}
third_party/googletest/googletest/include
third_party/googletest/googletest
third_party/googletest/googlemock/include
third_party/googletest/googlemock
${_gRPC_PROTO_GENS_DIR}
)
target_link_libraries(forkable_test
${_gRPC_PROTOBUF_LIBRARIES}
${_gRPC_ALLTARGETS_LIBRARIES}
grpc_unsecure
)
endif()
if(gRPC_BUILD_TESTS)
add_executable(format_request_test
test/core/end2end/data/client_certs.cc
test/core/end2end/data/server1_cert.cc

2
Makefile generated

@ -1454,6 +1454,7 @@ LIBGRPC_SRC = \
src/core/lib/event_engine/default_event_engine.cc \
src/core/lib/event_engine/default_event_engine_factory.cc \
src/core/lib/event_engine/executor/threaded_executor.cc \
src/core/lib/event_engine/forkable.cc \
src/core/lib/event_engine/memory_allocator.cc \
src/core/lib/event_engine/posix_engine/posix_engine.cc \
src/core/lib/event_engine/posix_engine/timer.cc \
@ -1928,6 +1929,7 @@ LIBGRPC_UNSECURE_SRC = \
src/core/lib/event_engine/default_event_engine.cc \
src/core/lib/event_engine/default_event_engine_factory.cc \
src/core/lib/event_engine/executor/threaded_executor.cc \
src/core/lib/event_engine/forkable.cc \
src/core/lib/event_engine/memory_allocator.cc \
src/core/lib/event_engine/posix_engine/posix_engine.cc \
src/core/lib/event_engine/posix_engine/timer.cc \

@ -752,6 +752,7 @@ libs:
- src/core/lib/event_engine/default_event_engine_factory.h
- src/core/lib/event_engine/executor/executor.h
- src/core/lib/event_engine/executor/threaded_executor.h
- src/core/lib/event_engine/forkable.h
- src/core/lib/event_engine/handle_containers.h
- src/core/lib/event_engine/poller.h
- src/core/lib/event_engine/posix_engine/posix_engine.h
@ -1448,6 +1449,7 @@ libs:
- src/core/lib/event_engine/default_event_engine.cc
- src/core/lib/event_engine/default_event_engine_factory.cc
- src/core/lib/event_engine/executor/threaded_executor.cc
- src/core/lib/event_engine/forkable.cc
- src/core/lib/event_engine/memory_allocator.cc
- src/core/lib/event_engine/posix_engine/posix_engine.cc
- src/core/lib/event_engine/posix_engine/timer.cc
@ -1938,6 +1940,7 @@ libs:
- src/core/lib/event_engine/default_event_engine_factory.h
- src/core/lib/event_engine/executor/executor.h
- src/core/lib/event_engine/executor/threaded_executor.h
- src/core/lib/event_engine/forkable.h
- src/core/lib/event_engine/handle_containers.h
- src/core/lib/event_engine/poller.h
- src/core/lib/event_engine/posix_engine/posix_engine.h
@ -2275,6 +2278,7 @@ libs:
- src/core/lib/event_engine/default_event_engine.cc
- src/core/lib/event_engine/default_event_engine_factory.cc
- src/core/lib/event_engine/executor/threaded_executor.cc
- src/core/lib/event_engine/forkable.cc
- src/core/lib/event_engine/memory_allocator.cc
- src/core/lib/event_engine/posix_engine/posix_engine.cc
- src/core/lib/event_engine/posix_engine/timer.cc
@ -6287,6 +6291,15 @@ targets:
- posix
- mac
uses_polling: false
- name: forkable_test
gtest: true
build: test
language: c++
headers: []
src:
- test/core/event_engine/forkable_test.cc
deps:
- grpc_unsecure
- name: format_request_test
gtest: true
build: test

1
config.m4 generated

@ -472,6 +472,7 @@ if test "$PHP_GRPC" != "no"; then
src/core/lib/event_engine/default_event_engine.cc \
src/core/lib/event_engine/default_event_engine_factory.cc \
src/core/lib/event_engine/executor/threaded_executor.cc \
src/core/lib/event_engine/forkable.cc \
src/core/lib/event_engine/memory_allocator.cc \
src/core/lib/event_engine/posix_engine/posix_engine.cc \
src/core/lib/event_engine/posix_engine/timer.cc \

1
config.w32 generated

@ -438,6 +438,7 @@ if (PHP_GRPC != "no") {
"src\\core\\lib\\event_engine\\default_event_engine.cc " +
"src\\core\\lib\\event_engine\\default_event_engine_factory.cc " +
"src\\core\\lib\\event_engine\\executor\\threaded_executor.cc " +
"src\\core\\lib\\event_engine\\forkable.cc " +
"src\\core\\lib\\event_engine\\memory_allocator.cc " +
"src\\core\\lib\\event_engine\\posix_engine\\posix_engine.cc " +
"src\\core\\lib\\event_engine\\posix_engine\\timer.cc " +

2
gRPC-C++.podspec generated

@ -684,6 +684,7 @@ Pod::Spec.new do |s|
'src/core/lib/event_engine/default_event_engine_factory.h',
'src/core/lib/event_engine/executor/executor.h',
'src/core/lib/event_engine/executor/threaded_executor.h',
'src/core/lib/event_engine/forkable.h',
'src/core/lib/event_engine/handle_containers.h',
'src/core/lib/event_engine/poller.h',
'src/core/lib/event_engine/posix_engine/posix_engine.h',
@ -1538,6 +1539,7 @@ Pod::Spec.new do |s|
'src/core/lib/event_engine/default_event_engine_factory.h',
'src/core/lib/event_engine/executor/executor.h',
'src/core/lib/event_engine/executor/threaded_executor.h',
'src/core/lib/event_engine/forkable.h',
'src/core/lib/event_engine/handle_containers.h',
'src/core/lib/event_engine/poller.h',
'src/core/lib/event_engine/posix_engine/posix_engine.h',

3
gRPC-Core.podspec generated

@ -1054,6 +1054,8 @@ Pod::Spec.new do |s|
'src/core/lib/event_engine/executor/executor.h',
'src/core/lib/event_engine/executor/threaded_executor.cc',
'src/core/lib/event_engine/executor/threaded_executor.h',
'src/core/lib/event_engine/forkable.cc',
'src/core/lib/event_engine/forkable.h',
'src/core/lib/event_engine/handle_containers.h',
'src/core/lib/event_engine/memory_allocator.cc',
'src/core/lib/event_engine/poller.h',
@ -2160,6 +2162,7 @@ Pod::Spec.new do |s|
'src/core/lib/event_engine/default_event_engine_factory.h',
'src/core/lib/event_engine/executor/executor.h',
'src/core/lib/event_engine/executor/threaded_executor.h',
'src/core/lib/event_engine/forkable.h',
'src/core/lib/event_engine/handle_containers.h',
'src/core/lib/event_engine/poller.h',
'src/core/lib/event_engine/posix_engine/posix_engine.h',

2
grpc.gemspec generated

@ -967,6 +967,8 @@ Gem::Specification.new do |s|
s.files += %w( src/core/lib/event_engine/executor/executor.h )
s.files += %w( src/core/lib/event_engine/executor/threaded_executor.cc )
s.files += %w( src/core/lib/event_engine/executor/threaded_executor.h )
s.files += %w( src/core/lib/event_engine/forkable.cc )
s.files += %w( src/core/lib/event_engine/forkable.h )
s.files += %w( src/core/lib/event_engine/handle_containers.h )
s.files += %w( src/core/lib/event_engine/memory_allocator.cc )
s.files += %w( src/core/lib/event_engine/poller.h )

2
grpc.gyp generated

@ -805,6 +805,7 @@
'src/core/lib/event_engine/default_event_engine.cc',
'src/core/lib/event_engine/default_event_engine_factory.cc',
'src/core/lib/event_engine/executor/threaded_executor.cc',
'src/core/lib/event_engine/forkable.cc',
'src/core/lib/event_engine/memory_allocator.cc',
'src/core/lib/event_engine/posix_engine/posix_engine.cc',
'src/core/lib/event_engine/posix_engine/timer.cc',
@ -1247,6 +1248,7 @@
'src/core/lib/event_engine/default_event_engine.cc',
'src/core/lib/event_engine/default_event_engine_factory.cc',
'src/core/lib/event_engine/executor/threaded_executor.cc',
'src/core/lib/event_engine/forkable.cc',
'src/core/lib/event_engine/memory_allocator.cc',
'src/core/lib/event_engine/posix_engine/posix_engine.cc',
'src/core/lib/event_engine/posix_engine/timer.cc',

2
package.xml generated

@ -949,6 +949,8 @@
<file baseinstalldir="/" name="src/core/lib/event_engine/executor/executor.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/event_engine/executor/threaded_executor.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/event_engine/executor/threaded_executor.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/event_engine/forkable.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/event_engine/forkable.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/event_engine/handle_containers.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/event_engine/memory_allocator.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/event_engine/poller.h" role="src" />

@ -0,0 +1,98 @@
// Copyright 2022 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <grpc/support/port_platform.h>
#include "src/core/lib/event_engine/forkable.h"
#ifdef GRPC_POSIX_FORK_ALLOW_PTHREAD_ATFORK
#include <pthread.h>
#include "absl/container/flat_hash_set.h"
#include "src/core/lib/gprpp/sync.h"
namespace grpc_event_engine {
namespace experimental {
namespace {
grpc_core::Mutex g_mu;
bool g_registered{false};
absl::flat_hash_set<Forkable*> g_forkables;
} // namespace
Forkable::Forkable() { ManageForkable(this); }
Forkable::~Forkable() { StopManagingForkable(this); }
void RegisterForkHandlers() {
grpc_core::MutexLock lock(&g_mu);
GPR_ASSERT(!absl::exchange(g_registered, true));
pthread_atfork(PrepareFork, PostforkParent, PostforkChild);
};
void PrepareFork() {
grpc_core::MutexLock lock(&g_mu);
for (auto* forkable : g_forkables) {
forkable->PrepareFork();
}
}
void PostforkParent() {
grpc_core::MutexLock lock(&g_mu);
for (auto* forkable : g_forkables) {
forkable->PostforkParent();
}
}
void PostforkChild() {
grpc_core::MutexLock lock(&g_mu);
for (auto* forkable : g_forkables) {
forkable->PostforkChild();
}
}
void ManageForkable(Forkable* forkable) {
grpc_core::MutexLock lock(&g_mu);
g_forkables.insert(forkable);
}
void StopManagingForkable(Forkable* forkable) {
grpc_core::MutexLock lock(&g_mu);
g_forkables.erase(forkable);
}
} // namespace experimental
} // namespace grpc_event_engine
#else // GRPC_POSIX_FORK_ALLOW_PTHREAD_ATFORK
namespace grpc_event_engine {
namespace experimental {
Forkable::Forkable() {}
Forkable::~Forkable() {}
void RegisterForkHandlers() {}
void PrepareFork() {}
void PostforkParent() {}
void PostforkChild() {}
void ManageForkable(Forkable* /* forkable */) {}
void StopManagingForkable(Forkable* /* forkable */) {}
} // namespace experimental
} // namespace grpc_event_engine
#endif // GRPC_POSIX_FORK_ALLOW_PTHREAD_ATFORK

@ -0,0 +1,61 @@
// Copyright 2022 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef GRPC_CORE_LIB_EVENT_ENGINE_FORKABLE_H
#define GRPC_CORE_LIB_EVENT_ENGINE_FORKABLE_H
#include <grpc/support/port_platform.h>
namespace grpc_event_engine {
namespace experimental {
// Register fork handlers with the system, enabling fork support.
//
// This provides pthread-based support for fork events. Any objects that
// implement Forkable can register themselves with this system using
// ManageForkable, and their respective methods will be called upon fork.
//
// This should be called once upon grpc_initialization.
void RegisterForkHandlers();
// Global callback for pthread_atfork's *prepare argument
void PrepareFork();
// Global callback for pthread_atfork's *parent argument
void PostforkParent();
// Global callback for pthread_atfork's *child argument
void PostforkChild();
// An interface to be implemented by EventEngines that wish to have managed fork
// support.
class Forkable {
public:
Forkable();
virtual ~Forkable();
virtual void PrepareFork() = 0;
virtual void PostforkParent() = 0;
virtual void PostforkChild() = 0;
};
// Add Forkables from the set of objects that are supported.
// Upon fork, each forkable will have its respective fork hooks called on
// the thread that invoked the fork.
//
// Relative ordering of fork callback operations is not guaranteed.
void ManageForkable(Forkable* forkable);
// Remove a forkable from the managed set.
void StopManagingForkable(Forkable* forkable);
} // namespace experimental
} // namespace grpc_event_engine
#endif // GRPC_CORE_LIB_EVENT_ENGINE_FORKABLE_H

@ -51,8 +51,6 @@ struct PosixEventEngine::ClosureData final : public EventEngine::Closure {
}
};
PosixEventEngine::PosixEventEngine() {}
PosixEventEngine::~PosixEventEngine() {
grpc_core::MutexLock lock(&mu_);
if (GRPC_TRACE_FLAG_ENABLED(grpc_event_engine_trace)) {

@ -75,7 +75,7 @@ class PosixEventEngine final : public EventEngine {
bool CancelLookup(LookupTaskHandle handle) override;
};
PosixEventEngine();
PosixEventEngine() = default;
~PosixEventEngine() override;
absl::StatusOr<std::unique_ptr<Listener>> CreateListener(

@ -73,12 +73,14 @@ void TimerManager::RunSomeTimers(
ThreadCollector collector;
{
grpc_core::MutexLock lock(&mu_);
if (shutdown_ || forking_) return;
// remove a waiter from the pool, and start another thread if necessary
--waiter_count_;
if (waiter_count_ == 0) {
// The number of timer threads is always increasing until all the threads
// are stopped. In rare cases, if a large number of timers fire
// simultaneously, we may end up using a large number of threads.
// are stopped, with the exception that all threads are shut down on fork
// events. In rare cases, if a large number of timers fire simultaneously,
// we may end up using a large number of threads.
// TODO(ctiller): We could avoid this by exiting threads in WaitUntil().
StartThread();
} else {
@ -106,9 +108,8 @@ void TimerManager::RunSomeTimers(
bool TimerManager::WaitUntil(grpc_core::Timestamp next) {
grpc_core::MutexLock lock(&mu_);
if (shutdown_) {
return false;
}
if (shutdown_) return false;
if (forking_) return false;
// TODO(ctiller): if there are too many waiting threads, this would be a good
// place to exit the current thread.
@ -250,5 +251,37 @@ void TimerManager::Kick() {
cv_.Signal();
}
void TimerManager::PrepareFork() {
{
grpc_core::MutexLock lock(&mu_);
forking_ = true;
prefork_thread_count_ = thread_count_;
cv_.SignalAll();
}
while (true) {
grpc_core::MutexLock lock(&mu_);
ThreadCollector collector;
collector.Collect(std::move(completed_threads_));
if (thread_count_ == 0) break;
cv_.Wait(&mu_);
}
}
void TimerManager::PostforkParent() {
grpc_core::MutexLock lock(&mu_);
for (int i = 0; i < prefork_thread_count_; i++) {
StartThread();
}
prefork_thread_count_ = 0;
}
void TimerManager::PostforkChild() {
grpc_core::MutexLock lock(&mu_);
for (int i = 0; i < prefork_thread_count_; i++) {
StartThread();
}
prefork_thread_count_ = 0;
}
} // namespace posix_engine
} // namespace grpc_event_engine

@ -31,6 +31,7 @@
#include <grpc/event_engine/event_engine.h>
#include "src/core/lib/event_engine/forkable.h"
#include "src/core/lib/event_engine/posix_engine/timer.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/gprpp/thd.h"
@ -43,10 +44,10 @@ namespace posix_engine {
// all times, and thus effectively preventing the thundering herd problem.
// TODO(ctiller): consider unifying this thread pool and the one in
// thread_pool.{h,cc}.
class TimerManager final {
class TimerManager final : public grpc_event_engine::experimental::Forkable {
public:
TimerManager();
~TimerManager();
~TimerManager() override;
grpc_core::Timestamp Now() { return host_.Now(); }
@ -54,6 +55,11 @@ class TimerManager final {
experimental::EventEngine::Closure* closure);
bool TimerCancel(Timer* timer);
// Forkable
void PrepareFork() override;
void PostforkParent() override;
void PostforkChild() override;
private:
struct RunThreadArgs {
TimerManager* self;
@ -92,6 +98,8 @@ class TimerManager final {
bool has_timed_waiter_ ABSL_GUARDED_BY(mu_) = false;
// are we shutting down?
bool shutdown_ ABSL_GUARDED_BY(mu_) = false;
// are we forking?
bool forking_ ABSL_GUARDED_BY(mu_) = false;
// are we shutting down?
bool kicked_ ABSL_GUARDED_BY(mu_) = false;
// the deadline of the current timed waiter thread (only relevant if
@ -103,6 +111,7 @@ class TimerManager final {
uint64_t wakeups_ ABSL_GUARDED_BY(mu_) = 0;
// actual timer implementation
std::unique_ptr<TimerList> timer_list_;
int prefork_thread_count_ = 0;
};
} // namespace posix_engine

@ -45,8 +45,9 @@ void ThreadPool::Thread::ThreadFunc() {
// Move ourselves to dead list
pool_->dead_threads_.push_back(this);
if ((pool_->shutdown_) && (pool_->nthreads_ == 0)) {
pool_->shutdown_cv_.Signal();
if (pool_->nthreads_ == 0) {
if (pool_->forking_) pool_->fork_cv_.Signal();
if (pool_->shutdown_) pool_->shutdown_cv_.Signal();
}
}
@ -54,7 +55,7 @@ void ThreadPool::ThreadFunc() {
for (;;) {
// Wait until work is available or we are shutting down.
grpc_core::ReleasableMutexLock lock(&mu_);
if (!shutdown_ && callbacks_.empty()) {
if (!forking_ && !shutdown_ && callbacks_.empty()) {
// If there are too many threads waiting, then quit this thread
if (threads_waiting_ >= reserve_threads_) {
break;
@ -63,6 +64,8 @@ void ThreadPool::ThreadFunc() {
cv_.Wait(&mu_);
threads_waiting_--;
}
// a fork could be initiated while the thread was waiting
if (forking_) return;
// Drain callbacks before considering shutdown to ensure all work
// gets completed.
if (!callbacks_.empty()) {
@ -80,9 +83,14 @@ ThreadPool::ThreadPool(int reserve_threads)
: shutdown_(false),
reserve_threads_(reserve_threads),
nthreads_(0),
threads_waiting_(0) {
for (int i = 0; i < reserve_threads_; i++) {
threads_waiting_(0),
forking_(false) {
grpc_core::MutexLock lock(&mu_);
StartNThreadsLocked(reserve_threads_);
}
void ThreadPool::StartNThreadsLocked(int n) {
for (int i = 0; i < n; i++) {
nthreads_++;
new Thread(this);
}
@ -107,6 +115,9 @@ void ThreadPool::Add(absl::AnyInvocable<void()> callback) {
grpc_core::MutexLock lock(&mu_);
// Add works to the callbacks list
callbacks_.push(std::move(callback));
// Store the callback for later if we are forking.
// TODO(hork): should we block instead?
if (forking_) return;
// Increase pool size or notify as needed
if (threads_waiting_ == 0) {
// Kick off a new thread
@ -121,5 +132,27 @@ void ThreadPool::Add(absl::AnyInvocable<void()> callback) {
}
}
void ThreadPool::PrepareFork() {
grpc_core::MutexLock lock(&mu_);
forking_ = true;
cv_.SignalAll();
while (nthreads_ != 0) {
fork_cv_.Wait(&mu_);
}
ReapThreads(&dead_threads_);
}
void ThreadPool::PostforkParent() {
grpc_core::MutexLock lock(&mu_);
forking_ = false;
StartNThreadsLocked(reserve_threads_);
}
void ThreadPool::PostforkChild() {
grpc_core::MutexLock lock(&mu_);
forking_ = false;
StartNThreadsLocked(reserve_threads_);
}
} // namespace experimental
} // namespace grpc_event_engine

@ -24,21 +24,28 @@
#include <queue>
#include <vector>
#include "absl/base/thread_annotations.h"
#include "absl/functional/any_invocable.h"
#include "src/core/lib/event_engine/forkable.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/gprpp/thd.h"
namespace grpc_event_engine {
namespace experimental {
class ThreadPool final {
class ThreadPool final : public grpc_event_engine::experimental::Forkable {
public:
explicit ThreadPool(int reserve_threads);
~ThreadPool();
~ThreadPool() override;
void Add(absl::AnyInvocable<void()> callback);
// Forkable
void PrepareFork() override;
void PostforkParent() override;
void PostforkChild() override;
private:
class Thread {
public:
@ -52,17 +59,20 @@ class ThreadPool final {
};
void ThreadFunc();
void StartNThreadsLocked(int n) ABSL_EXCLUSIVE_LOCKS_REQUIRED(&mu_);
static void ReapThreads(std::vector<Thread*>* tlist);
grpc_core::Mutex mu_;
grpc_core::CondVar cv_;
grpc_core::CondVar shutdown_cv_;
grpc_core::CondVar fork_cv_;
bool shutdown_;
std::queue<absl::AnyInvocable<void()>> callbacks_;
int reserve_threads_;
int nthreads_;
int threads_waiting_;
std::vector<Thread*> dead_threads_;
bool forking_;
};
} // namespace experimental

@ -37,6 +37,7 @@
#include "src/core/lib/channel/channel_stack_builder.h"
#include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/event_engine/forkable.h"
#include "src/core/lib/gprpp/fork.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/gprpp/thd.h"
@ -147,6 +148,7 @@ void grpc_init(void) {
g_shutting_down_cv->SignalAll();
}
grpc_core::Fork::GlobalInit();
grpc_event_engine::experimental::RegisterForkHandlers();
grpc_fork_handlers_auto_register();
grpc_core::ApplicationCallbackExecCtx::GlobalInit();
grpc_iomgr_init();

@ -447,6 +447,7 @@ CORE_SOURCE_FILES = [
'src/core/lib/event_engine/default_event_engine.cc',
'src/core/lib/event_engine/default_event_engine_factory.cc',
'src/core/lib/event_engine/executor/threaded_executor.cc',
'src/core/lib/event_engine/forkable.cc',
'src/core/lib/event_engine/memory_allocator.cc',
'src/core/lib/event_engine/posix_engine/posix_engine.cc',
'src/core/lib/event_engine/posix_engine/timer.cc',

@ -33,6 +33,21 @@ grpc_cc_test(
],
)
grpc_cc_test(
name = "forkable_test",
srcs = ["forkable_test.cc"],
external_deps = [
"absl/time",
"gtest",
],
deps = [
"//:forkable",
"//:gpr_base",
"//:gpr_platform",
"//:grpc_unsecure",
],
)
grpc_cc_test(
name = "endpoint_config_test",
srcs = ["endpoint_config_test.cc"],

@ -0,0 +1,103 @@
// Copyright 2022 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <grpc/support/port_platform.h>
#ifndef GRPC_ENABLE_FORK_SUPPORT
// Test nothing, everything is fine
int main(int /* argc */, char** /* argv */) { return 0; }
#else // GRPC_ENABLE_FORK_SUPPORT
#include <sys/wait.h>
#include <unistd.h>
#include <gtest/gtest.h>
#include "absl/time/clock.h"
#include <grpc/grpc.h>
#include <grpc/support/log.h>
#include "src/core/lib/event_engine/forkable.h"
namespace {
using ::grpc_event_engine::experimental::Forkable;
using ::grpc_event_engine::experimental::RegisterForkHandlers;
} // namespace
class ForkableTest : public testing::Test {};
#ifdef GRPC_POSIX_FORK_ALLOW_PTHREAD_ATFORK
TEST_F(ForkableTest, BasicPthreadAtForkOperations) {
class SomeForkable : public Forkable {
public:
void PrepareFork() override { prepare_called_ = true; }
void PostforkParent() override { parent_called_ = true; }
void PostforkChild() override { child_called_ = true; }
void CheckParent() {
EXPECT_TRUE(prepare_called_);
EXPECT_TRUE(parent_called_);
EXPECT_FALSE(child_called_);
}
void CheckChild() {
EXPECT_TRUE(prepare_called_);
EXPECT_FALSE(parent_called_);
EXPECT_TRUE(child_called_);
}
private:
bool prepare_called_ = false;
bool parent_called_ = false;
bool child_called_ = false;
};
SomeForkable forkable;
int child_pid = fork();
ASSERT_NE(child_pid, -1);
if (child_pid == 0) {
gpr_log(GPR_DEBUG, "I am child pid: %d", getpid());
forkable.CheckChild();
exit(testing::Test::HasFailure());
} else {
gpr_log(GPR_DEBUG, "I am parent pid: %d", getpid());
forkable.CheckParent();
int status;
gpr_log(GPR_DEBUG, "Waiting for child pid: %d", child_pid);
do {
// retry on EINTR, and fail otherwise
if (waitpid(child_pid, &status, 0) != -1) break;
ASSERT_EQ(errno, EINTR);
} while (true);
if (WIFEXITED(status)) {
ASSERT_EQ(WEXITSTATUS(status), 0);
} else {
// exited abnormally, fail and print the exit status
ASSERT_EQ(-99, status);
}
}
}
#endif // GRPC_POSIX_FORK_ALLOW_PTHREAD_ATFORK
int main(int argc, char** argv) {
testing::InitGoogleTest(&argc, argv);
RegisterForkHandlers();
auto result = RUN_ALL_TESTS();
return result;
}
#endif // GRPC_ENABLE_FORK_SUPPORT

@ -1950,6 +1950,8 @@ src/core/lib/event_engine/default_event_engine_factory.h \
src/core/lib/event_engine/executor/executor.h \
src/core/lib/event_engine/executor/threaded_executor.cc \
src/core/lib/event_engine/executor/threaded_executor.h \
src/core/lib/event_engine/forkable.cc \
src/core/lib/event_engine/forkable.h \
src/core/lib/event_engine/handle_containers.h \
src/core/lib/event_engine/memory_allocator.cc \
src/core/lib/event_engine/poller.h \

@ -1740,6 +1740,8 @@ src/core/lib/event_engine/default_event_engine_factory.h \
src/core/lib/event_engine/executor/executor.h \
src/core/lib/event_engine/executor/threaded_executor.cc \
src/core/lib/event_engine/executor/threaded_executor.h \
src/core/lib/event_engine/forkable.cc \
src/core/lib/event_engine/forkable.h \
src/core/lib/event_engine/handle_containers.h \
src/core/lib/event_engine/memory_allocator.cc \
src/core/lib/event_engine/poller.h \

@ -3101,6 +3101,30 @@
],
"uses_polling": false
},
{
"args": [],
"benchmark": false,
"ci_platforms": [
"linux",
"mac",
"posix",
"windows"
],
"cpu_cost": 1.0,
"exclude_configs": [],
"exclude_iomgrs": [],
"flaky": false,
"gtest": true,
"language": "c++",
"name": "forkable_test",
"platforms": [
"linux",
"mac",
"posix",
"windows"
],
"uses_polling": true
},
{
"args": [],
"benchmark": false,

Loading…
Cancel
Save