diff --git a/BUILD b/BUILD index b3ac78cc859..5a0b617c817 100644 --- a/BUILD +++ b/BUILD @@ -417,6 +417,7 @@ grpc_cc_library( "channel_stack_type", "config", "default_event_engine", + "forkable", "gpr_base", "grpc_authorization_base", "grpc_base", @@ -480,6 +481,7 @@ grpc_cc_library( "channel_stack_type", "config", "default_event_engine", + "forkable", "gpr_base", "grpc_authorization_base", "grpc_base", @@ -2221,6 +2223,21 @@ grpc_cc_library( deps = ["gpr_base"], ) +grpc_cc_library( + name = "forkable", + srcs = [ + "src/core/lib/event_engine/forkable.cc", + ], + hdrs = [ + "src/core/lib/event_engine/forkable.h", + ], + external_deps = ["absl/container:flat_hash_set"], + deps = [ + "gpr_base", + "gpr_platform", + ], +) + grpc_cc_library( name = "event_engine_poller", hdrs = [ @@ -2314,8 +2331,14 @@ grpc_cc_library( hdrs = [ "src/core/lib/event_engine/thread_pool.h", ], - external_deps = ["absl/functional:any_invocable"], - deps = ["gpr_base"], + external_deps = [ + "absl/base:core_headers", + "absl/functional:any_invocable", + ], + deps = [ + "forkable", + "gpr_base", + ], ) grpc_cc_library( @@ -2332,6 +2355,7 @@ grpc_cc_library( ], deps = [ "event_engine_base_hdrs", + "forkable", "gpr_base", "gpr_codegen", "posix_event_engine_timer", diff --git a/CMakeLists.txt b/CMakeLists.txt index e4832431a40..c34728e9632 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -959,6 +959,7 @@ if(gRPC_BUILD_TESTS) if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX) add_dependencies(buildtests_cxx fork_test) endif() + add_dependencies(buildtests_cxx forkable_test) add_dependencies(buildtests_cxx format_request_test) add_dependencies(buildtests_cxx frame_handler_test) if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_POSIX) @@ -2094,6 +2095,7 @@ add_library(grpc src/core/lib/event_engine/default_event_engine.cc src/core/lib/event_engine/default_event_engine_factory.cc src/core/lib/event_engine/executor/threaded_executor.cc + src/core/lib/event_engine/forkable.cc src/core/lib/event_engine/memory_allocator.cc src/core/lib/event_engine/posix_engine/posix_engine.cc src/core/lib/event_engine/posix_engine/timer.cc @@ -2704,6 +2706,7 @@ add_library(grpc_unsecure src/core/lib/event_engine/default_event_engine.cc src/core/lib/event_engine/default_event_engine_factory.cc src/core/lib/event_engine/executor/threaded_executor.cc + src/core/lib/event_engine/forkable.cc src/core/lib/event_engine/memory_allocator.cc src/core/lib/event_engine/posix_engine/posix_engine.cc src/core/lib/event_engine/posix_engine/timer.cc @@ -10162,6 +10165,41 @@ endif() endif() if(gRPC_BUILD_TESTS) +add_executable(forkable_test + test/core/event_engine/forkable_test.cc + third_party/googletest/googletest/src/gtest-all.cc + third_party/googletest/googlemock/src/gmock-all.cc +) + +target_include_directories(forkable_test + PRIVATE + ${CMAKE_CURRENT_SOURCE_DIR} + ${CMAKE_CURRENT_SOURCE_DIR}/include + ${_gRPC_ADDRESS_SORTING_INCLUDE_DIR} + ${_gRPC_RE2_INCLUDE_DIR} + ${_gRPC_SSL_INCLUDE_DIR} + ${_gRPC_UPB_GENERATED_DIR} + ${_gRPC_UPB_GRPC_GENERATED_DIR} + ${_gRPC_UPB_INCLUDE_DIR} + ${_gRPC_XXHASH_INCLUDE_DIR} + ${_gRPC_ZLIB_INCLUDE_DIR} + third_party/googletest/googletest/include + third_party/googletest/googletest + third_party/googletest/googlemock/include + third_party/googletest/googlemock + ${_gRPC_PROTO_GENS_DIR} +) + +target_link_libraries(forkable_test + ${_gRPC_PROTOBUF_LIBRARIES} + ${_gRPC_ALLTARGETS_LIBRARIES} + grpc_unsecure +) + + +endif() +if(gRPC_BUILD_TESTS) + add_executable(format_request_test test/core/end2end/data/client_certs.cc test/core/end2end/data/server1_cert.cc diff --git a/Makefile b/Makefile index 2b2d3feea1c..45cdc607513 100644 --- a/Makefile +++ b/Makefile @@ -1454,6 +1454,7 @@ LIBGRPC_SRC = \ src/core/lib/event_engine/default_event_engine.cc \ src/core/lib/event_engine/default_event_engine_factory.cc \ src/core/lib/event_engine/executor/threaded_executor.cc \ + src/core/lib/event_engine/forkable.cc \ src/core/lib/event_engine/memory_allocator.cc \ src/core/lib/event_engine/posix_engine/posix_engine.cc \ src/core/lib/event_engine/posix_engine/timer.cc \ @@ -1928,6 +1929,7 @@ LIBGRPC_UNSECURE_SRC = \ src/core/lib/event_engine/default_event_engine.cc \ src/core/lib/event_engine/default_event_engine_factory.cc \ src/core/lib/event_engine/executor/threaded_executor.cc \ + src/core/lib/event_engine/forkable.cc \ src/core/lib/event_engine/memory_allocator.cc \ src/core/lib/event_engine/posix_engine/posix_engine.cc \ src/core/lib/event_engine/posix_engine/timer.cc \ diff --git a/build_autogenerated.yaml b/build_autogenerated.yaml index 7f87f51f741..55b6dd7cdcd 100644 --- a/build_autogenerated.yaml +++ b/build_autogenerated.yaml @@ -752,6 +752,7 @@ libs: - src/core/lib/event_engine/default_event_engine_factory.h - src/core/lib/event_engine/executor/executor.h - src/core/lib/event_engine/executor/threaded_executor.h + - src/core/lib/event_engine/forkable.h - src/core/lib/event_engine/handle_containers.h - src/core/lib/event_engine/poller.h - src/core/lib/event_engine/posix_engine/posix_engine.h @@ -1448,6 +1449,7 @@ libs: - src/core/lib/event_engine/default_event_engine.cc - src/core/lib/event_engine/default_event_engine_factory.cc - src/core/lib/event_engine/executor/threaded_executor.cc + - src/core/lib/event_engine/forkable.cc - src/core/lib/event_engine/memory_allocator.cc - src/core/lib/event_engine/posix_engine/posix_engine.cc - src/core/lib/event_engine/posix_engine/timer.cc @@ -1938,6 +1940,7 @@ libs: - src/core/lib/event_engine/default_event_engine_factory.h - src/core/lib/event_engine/executor/executor.h - src/core/lib/event_engine/executor/threaded_executor.h + - src/core/lib/event_engine/forkable.h - src/core/lib/event_engine/handle_containers.h - src/core/lib/event_engine/poller.h - src/core/lib/event_engine/posix_engine/posix_engine.h @@ -2275,6 +2278,7 @@ libs: - src/core/lib/event_engine/default_event_engine.cc - src/core/lib/event_engine/default_event_engine_factory.cc - src/core/lib/event_engine/executor/threaded_executor.cc + - src/core/lib/event_engine/forkable.cc - src/core/lib/event_engine/memory_allocator.cc - src/core/lib/event_engine/posix_engine/posix_engine.cc - src/core/lib/event_engine/posix_engine/timer.cc @@ -6287,6 +6291,15 @@ targets: - posix - mac uses_polling: false +- name: forkable_test + gtest: true + build: test + language: c++ + headers: [] + src: + - test/core/event_engine/forkable_test.cc + deps: + - grpc_unsecure - name: format_request_test gtest: true build: test diff --git a/config.m4 b/config.m4 index dbdf6a3d0c6..801c65e6c1f 100644 --- a/config.m4 +++ b/config.m4 @@ -472,6 +472,7 @@ if test "$PHP_GRPC" != "no"; then src/core/lib/event_engine/default_event_engine.cc \ src/core/lib/event_engine/default_event_engine_factory.cc \ src/core/lib/event_engine/executor/threaded_executor.cc \ + src/core/lib/event_engine/forkable.cc \ src/core/lib/event_engine/memory_allocator.cc \ src/core/lib/event_engine/posix_engine/posix_engine.cc \ src/core/lib/event_engine/posix_engine/timer.cc \ diff --git a/config.w32 b/config.w32 index 68213c2a4d3..03445a3c6dd 100644 --- a/config.w32 +++ b/config.w32 @@ -438,6 +438,7 @@ if (PHP_GRPC != "no") { "src\\core\\lib\\event_engine\\default_event_engine.cc " + "src\\core\\lib\\event_engine\\default_event_engine_factory.cc " + "src\\core\\lib\\event_engine\\executor\\threaded_executor.cc " + + "src\\core\\lib\\event_engine\\forkable.cc " + "src\\core\\lib\\event_engine\\memory_allocator.cc " + "src\\core\\lib\\event_engine\\posix_engine\\posix_engine.cc " + "src\\core\\lib\\event_engine\\posix_engine\\timer.cc " + diff --git a/gRPC-C++.podspec b/gRPC-C++.podspec index dcced15e77a..d6ba831478d 100644 --- a/gRPC-C++.podspec +++ b/gRPC-C++.podspec @@ -684,6 +684,7 @@ Pod::Spec.new do |s| 'src/core/lib/event_engine/default_event_engine_factory.h', 'src/core/lib/event_engine/executor/executor.h', 'src/core/lib/event_engine/executor/threaded_executor.h', + 'src/core/lib/event_engine/forkable.h', 'src/core/lib/event_engine/handle_containers.h', 'src/core/lib/event_engine/poller.h', 'src/core/lib/event_engine/posix_engine/posix_engine.h', @@ -1538,6 +1539,7 @@ Pod::Spec.new do |s| 'src/core/lib/event_engine/default_event_engine_factory.h', 'src/core/lib/event_engine/executor/executor.h', 'src/core/lib/event_engine/executor/threaded_executor.h', + 'src/core/lib/event_engine/forkable.h', 'src/core/lib/event_engine/handle_containers.h', 'src/core/lib/event_engine/poller.h', 'src/core/lib/event_engine/posix_engine/posix_engine.h', diff --git a/gRPC-Core.podspec b/gRPC-Core.podspec index 41a68fe464b..6dc6cc2c03f 100644 --- a/gRPC-Core.podspec +++ b/gRPC-Core.podspec @@ -1054,6 +1054,8 @@ Pod::Spec.new do |s| 'src/core/lib/event_engine/executor/executor.h', 'src/core/lib/event_engine/executor/threaded_executor.cc', 'src/core/lib/event_engine/executor/threaded_executor.h', + 'src/core/lib/event_engine/forkable.cc', + 'src/core/lib/event_engine/forkable.h', 'src/core/lib/event_engine/handle_containers.h', 'src/core/lib/event_engine/memory_allocator.cc', 'src/core/lib/event_engine/poller.h', @@ -2160,6 +2162,7 @@ Pod::Spec.new do |s| 'src/core/lib/event_engine/default_event_engine_factory.h', 'src/core/lib/event_engine/executor/executor.h', 'src/core/lib/event_engine/executor/threaded_executor.h', + 'src/core/lib/event_engine/forkable.h', 'src/core/lib/event_engine/handle_containers.h', 'src/core/lib/event_engine/poller.h', 'src/core/lib/event_engine/posix_engine/posix_engine.h', diff --git a/grpc.gemspec b/grpc.gemspec index db96e3ffba5..c12c9e67cfc 100644 --- a/grpc.gemspec +++ b/grpc.gemspec @@ -967,6 +967,8 @@ Gem::Specification.new do |s| s.files += %w( src/core/lib/event_engine/executor/executor.h ) s.files += %w( src/core/lib/event_engine/executor/threaded_executor.cc ) s.files += %w( src/core/lib/event_engine/executor/threaded_executor.h ) + s.files += %w( src/core/lib/event_engine/forkable.cc ) + s.files += %w( src/core/lib/event_engine/forkable.h ) s.files += %w( src/core/lib/event_engine/handle_containers.h ) s.files += %w( src/core/lib/event_engine/memory_allocator.cc ) s.files += %w( src/core/lib/event_engine/poller.h ) diff --git a/grpc.gyp b/grpc.gyp index 97bcee758c2..8cc8ae3d55c 100644 --- a/grpc.gyp +++ b/grpc.gyp @@ -805,6 +805,7 @@ 'src/core/lib/event_engine/default_event_engine.cc', 'src/core/lib/event_engine/default_event_engine_factory.cc', 'src/core/lib/event_engine/executor/threaded_executor.cc', + 'src/core/lib/event_engine/forkable.cc', 'src/core/lib/event_engine/memory_allocator.cc', 'src/core/lib/event_engine/posix_engine/posix_engine.cc', 'src/core/lib/event_engine/posix_engine/timer.cc', @@ -1247,6 +1248,7 @@ 'src/core/lib/event_engine/default_event_engine.cc', 'src/core/lib/event_engine/default_event_engine_factory.cc', 'src/core/lib/event_engine/executor/threaded_executor.cc', + 'src/core/lib/event_engine/forkable.cc', 'src/core/lib/event_engine/memory_allocator.cc', 'src/core/lib/event_engine/posix_engine/posix_engine.cc', 'src/core/lib/event_engine/posix_engine/timer.cc', diff --git a/package.xml b/package.xml index 7e1465a22d4..3981449aecf 100644 --- a/package.xml +++ b/package.xml @@ -949,6 +949,8 @@ + + diff --git a/src/core/lib/event_engine/forkable.cc b/src/core/lib/event_engine/forkable.cc new file mode 100644 index 00000000000..fd1d34dc00f --- /dev/null +++ b/src/core/lib/event_engine/forkable.cc @@ -0,0 +1,98 @@ +// Copyright 2022 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#include "src/core/lib/event_engine/forkable.h" + +#ifdef GRPC_POSIX_FORK_ALLOW_PTHREAD_ATFORK + +#include + +#include "absl/container/flat_hash_set.h" + +#include "src/core/lib/gprpp/sync.h" + +namespace grpc_event_engine { +namespace experimental { + +namespace { +grpc_core::Mutex g_mu; +bool g_registered{false}; +absl::flat_hash_set g_forkables; +} // namespace + +Forkable::Forkable() { ManageForkable(this); } + +Forkable::~Forkable() { StopManagingForkable(this); } + +void RegisterForkHandlers() { + grpc_core::MutexLock lock(&g_mu); + GPR_ASSERT(!absl::exchange(g_registered, true)); + pthread_atfork(PrepareFork, PostforkParent, PostforkChild); +}; + +void PrepareFork() { + grpc_core::MutexLock lock(&g_mu); + for (auto* forkable : g_forkables) { + forkable->PrepareFork(); + } +} +void PostforkParent() { + grpc_core::MutexLock lock(&g_mu); + for (auto* forkable : g_forkables) { + forkable->PostforkParent(); + } +} + +void PostforkChild() { + grpc_core::MutexLock lock(&g_mu); + for (auto* forkable : g_forkables) { + forkable->PostforkChild(); + } +} + +void ManageForkable(Forkable* forkable) { + grpc_core::MutexLock lock(&g_mu); + g_forkables.insert(forkable); +} + +void StopManagingForkable(Forkable* forkable) { + grpc_core::MutexLock lock(&g_mu); + g_forkables.erase(forkable); +} + +} // namespace experimental +} // namespace grpc_event_engine + +#else // GRPC_POSIX_FORK_ALLOW_PTHREAD_ATFORK + +namespace grpc_event_engine { +namespace experimental { + +Forkable::Forkable() {} +Forkable::~Forkable() {} + +void RegisterForkHandlers() {} +void PrepareFork() {} +void PostforkParent() {} +void PostforkChild() {} + +void ManageForkable(Forkable* /* forkable */) {} +void StopManagingForkable(Forkable* /* forkable */) {} + +} // namespace experimental +} // namespace grpc_event_engine + +#endif // GRPC_POSIX_FORK_ALLOW_PTHREAD_ATFORK diff --git a/src/core/lib/event_engine/forkable.h b/src/core/lib/event_engine/forkable.h new file mode 100644 index 00000000000..de50a6bd4eb --- /dev/null +++ b/src/core/lib/event_engine/forkable.h @@ -0,0 +1,61 @@ +// Copyright 2022 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +#ifndef GRPC_CORE_LIB_EVENT_ENGINE_FORKABLE_H +#define GRPC_CORE_LIB_EVENT_ENGINE_FORKABLE_H + +#include + +namespace grpc_event_engine { +namespace experimental { + +// Register fork handlers with the system, enabling fork support. +// +// This provides pthread-based support for fork events. Any objects that +// implement Forkable can register themselves with this system using +// ManageForkable, and their respective methods will be called upon fork. +// +// This should be called once upon grpc_initialization. +void RegisterForkHandlers(); + +// Global callback for pthread_atfork's *prepare argument +void PrepareFork(); +// Global callback for pthread_atfork's *parent argument +void PostforkParent(); +// Global callback for pthread_atfork's *child argument +void PostforkChild(); + +// An interface to be implemented by EventEngines that wish to have managed fork +// support. +class Forkable { + public: + Forkable(); + virtual ~Forkable(); + virtual void PrepareFork() = 0; + virtual void PostforkParent() = 0; + virtual void PostforkChild() = 0; +}; + +// Add Forkables from the set of objects that are supported. +// Upon fork, each forkable will have its respective fork hooks called on +// the thread that invoked the fork. +// +// Relative ordering of fork callback operations is not guaranteed. +void ManageForkable(Forkable* forkable); +// Remove a forkable from the managed set. +void StopManagingForkable(Forkable* forkable); + +} // namespace experimental +} // namespace grpc_event_engine + +#endif // GRPC_CORE_LIB_EVENT_ENGINE_FORKABLE_H \ No newline at end of file diff --git a/src/core/lib/event_engine/posix_engine/posix_engine.cc b/src/core/lib/event_engine/posix_engine/posix_engine.cc index 4c187d59a71..07577b6a97b 100644 --- a/src/core/lib/event_engine/posix_engine/posix_engine.cc +++ b/src/core/lib/event_engine/posix_engine/posix_engine.cc @@ -51,8 +51,6 @@ struct PosixEventEngine::ClosureData final : public EventEngine::Closure { } }; -PosixEventEngine::PosixEventEngine() {} - PosixEventEngine::~PosixEventEngine() { grpc_core::MutexLock lock(&mu_); if (GRPC_TRACE_FLAG_ENABLED(grpc_event_engine_trace)) { diff --git a/src/core/lib/event_engine/posix_engine/posix_engine.h b/src/core/lib/event_engine/posix_engine/posix_engine.h index 2170a6ae8d0..eac6dfb4c53 100644 --- a/src/core/lib/event_engine/posix_engine/posix_engine.h +++ b/src/core/lib/event_engine/posix_engine/posix_engine.h @@ -75,7 +75,7 @@ class PosixEventEngine final : public EventEngine { bool CancelLookup(LookupTaskHandle handle) override; }; - PosixEventEngine(); + PosixEventEngine() = default; ~PosixEventEngine() override; absl::StatusOr> CreateListener( diff --git a/src/core/lib/event_engine/posix_engine/timer_manager.cc b/src/core/lib/event_engine/posix_engine/timer_manager.cc index fa2db0ce903..633831d7e70 100644 --- a/src/core/lib/event_engine/posix_engine/timer_manager.cc +++ b/src/core/lib/event_engine/posix_engine/timer_manager.cc @@ -73,12 +73,14 @@ void TimerManager::RunSomeTimers( ThreadCollector collector; { grpc_core::MutexLock lock(&mu_); + if (shutdown_ || forking_) return; // remove a waiter from the pool, and start another thread if necessary --waiter_count_; if (waiter_count_ == 0) { // The number of timer threads is always increasing until all the threads - // are stopped. In rare cases, if a large number of timers fire - // simultaneously, we may end up using a large number of threads. + // are stopped, with the exception that all threads are shut down on fork + // events. In rare cases, if a large number of timers fire simultaneously, + // we may end up using a large number of threads. // TODO(ctiller): We could avoid this by exiting threads in WaitUntil(). StartThread(); } else { @@ -106,9 +108,8 @@ void TimerManager::RunSomeTimers( bool TimerManager::WaitUntil(grpc_core::Timestamp next) { grpc_core::MutexLock lock(&mu_); - if (shutdown_) { - return false; - } + if (shutdown_) return false; + if (forking_) return false; // TODO(ctiller): if there are too many waiting threads, this would be a good // place to exit the current thread. @@ -250,5 +251,37 @@ void TimerManager::Kick() { cv_.Signal(); } +void TimerManager::PrepareFork() { + { + grpc_core::MutexLock lock(&mu_); + forking_ = true; + prefork_thread_count_ = thread_count_; + cv_.SignalAll(); + } + while (true) { + grpc_core::MutexLock lock(&mu_); + ThreadCollector collector; + collector.Collect(std::move(completed_threads_)); + if (thread_count_ == 0) break; + cv_.Wait(&mu_); + } +} + +void TimerManager::PostforkParent() { + grpc_core::MutexLock lock(&mu_); + for (int i = 0; i < prefork_thread_count_; i++) { + StartThread(); + } + prefork_thread_count_ = 0; +} + +void TimerManager::PostforkChild() { + grpc_core::MutexLock lock(&mu_); + for (int i = 0; i < prefork_thread_count_; i++) { + StartThread(); + } + prefork_thread_count_ = 0; +} + } // namespace posix_engine } // namespace grpc_event_engine diff --git a/src/core/lib/event_engine/posix_engine/timer_manager.h b/src/core/lib/event_engine/posix_engine/timer_manager.h index f3dfd69d090..e3a429e687d 100644 --- a/src/core/lib/event_engine/posix_engine/timer_manager.h +++ b/src/core/lib/event_engine/posix_engine/timer_manager.h @@ -31,6 +31,7 @@ #include +#include "src/core/lib/event_engine/forkable.h" #include "src/core/lib/event_engine/posix_engine/timer.h" #include "src/core/lib/gprpp/sync.h" #include "src/core/lib/gprpp/thd.h" @@ -43,10 +44,10 @@ namespace posix_engine { // all times, and thus effectively preventing the thundering herd problem. // TODO(ctiller): consider unifying this thread pool and the one in // thread_pool.{h,cc}. -class TimerManager final { +class TimerManager final : public grpc_event_engine::experimental::Forkable { public: TimerManager(); - ~TimerManager(); + ~TimerManager() override; grpc_core::Timestamp Now() { return host_.Now(); } @@ -54,6 +55,11 @@ class TimerManager final { experimental::EventEngine::Closure* closure); bool TimerCancel(Timer* timer); + // Forkable + void PrepareFork() override; + void PostforkParent() override; + void PostforkChild() override; + private: struct RunThreadArgs { TimerManager* self; @@ -92,6 +98,8 @@ class TimerManager final { bool has_timed_waiter_ ABSL_GUARDED_BY(mu_) = false; // are we shutting down? bool shutdown_ ABSL_GUARDED_BY(mu_) = false; + // are we forking? + bool forking_ ABSL_GUARDED_BY(mu_) = false; // are we shutting down? bool kicked_ ABSL_GUARDED_BY(mu_) = false; // the deadline of the current timed waiter thread (only relevant if @@ -103,6 +111,7 @@ class TimerManager final { uint64_t wakeups_ ABSL_GUARDED_BY(mu_) = 0; // actual timer implementation std::unique_ptr timer_list_; + int prefork_thread_count_ = 0; }; } // namespace posix_engine diff --git a/src/core/lib/event_engine/thread_pool.cc b/src/core/lib/event_engine/thread_pool.cc index d45bd890a7f..efe288d5c5f 100644 --- a/src/core/lib/event_engine/thread_pool.cc +++ b/src/core/lib/event_engine/thread_pool.cc @@ -45,8 +45,9 @@ void ThreadPool::Thread::ThreadFunc() { // Move ourselves to dead list pool_->dead_threads_.push_back(this); - if ((pool_->shutdown_) && (pool_->nthreads_ == 0)) { - pool_->shutdown_cv_.Signal(); + if (pool_->nthreads_ == 0) { + if (pool_->forking_) pool_->fork_cv_.Signal(); + if (pool_->shutdown_) pool_->shutdown_cv_.Signal(); } } @@ -54,7 +55,7 @@ void ThreadPool::ThreadFunc() { for (;;) { // Wait until work is available or we are shutting down. grpc_core::ReleasableMutexLock lock(&mu_); - if (!shutdown_ && callbacks_.empty()) { + if (!forking_ && !shutdown_ && callbacks_.empty()) { // If there are too many threads waiting, then quit this thread if (threads_waiting_ >= reserve_threads_) { break; @@ -63,6 +64,8 @@ void ThreadPool::ThreadFunc() { cv_.Wait(&mu_); threads_waiting_--; } + // a fork could be initiated while the thread was waiting + if (forking_) return; // Drain callbacks before considering shutdown to ensure all work // gets completed. if (!callbacks_.empty()) { @@ -80,9 +83,14 @@ ThreadPool::ThreadPool(int reserve_threads) : shutdown_(false), reserve_threads_(reserve_threads), nthreads_(0), - threads_waiting_(0) { - for (int i = 0; i < reserve_threads_; i++) { - grpc_core::MutexLock lock(&mu_); + threads_waiting_(0), + forking_(false) { + grpc_core::MutexLock lock(&mu_); + StartNThreadsLocked(reserve_threads_); +} + +void ThreadPool::StartNThreadsLocked(int n) { + for (int i = 0; i < n; i++) { nthreads_++; new Thread(this); } @@ -107,6 +115,9 @@ void ThreadPool::Add(absl::AnyInvocable callback) { grpc_core::MutexLock lock(&mu_); // Add works to the callbacks list callbacks_.push(std::move(callback)); + // Store the callback for later if we are forking. + // TODO(hork): should we block instead? + if (forking_) return; // Increase pool size or notify as needed if (threads_waiting_ == 0) { // Kick off a new thread @@ -121,5 +132,27 @@ void ThreadPool::Add(absl::AnyInvocable callback) { } } +void ThreadPool::PrepareFork() { + grpc_core::MutexLock lock(&mu_); + forking_ = true; + cv_.SignalAll(); + while (nthreads_ != 0) { + fork_cv_.Wait(&mu_); + } + ReapThreads(&dead_threads_); +} + +void ThreadPool::PostforkParent() { + grpc_core::MutexLock lock(&mu_); + forking_ = false; + StartNThreadsLocked(reserve_threads_); +} + +void ThreadPool::PostforkChild() { + grpc_core::MutexLock lock(&mu_); + forking_ = false; + StartNThreadsLocked(reserve_threads_); +} + } // namespace experimental } // namespace grpc_event_engine diff --git a/src/core/lib/event_engine/thread_pool.h b/src/core/lib/event_engine/thread_pool.h index 34f93ba7a93..6613279cc5c 100644 --- a/src/core/lib/event_engine/thread_pool.h +++ b/src/core/lib/event_engine/thread_pool.h @@ -24,21 +24,28 @@ #include #include +#include "absl/base/thread_annotations.h" #include "absl/functional/any_invocable.h" +#include "src/core/lib/event_engine/forkable.h" #include "src/core/lib/gprpp/sync.h" #include "src/core/lib/gprpp/thd.h" namespace grpc_event_engine { namespace experimental { -class ThreadPool final { +class ThreadPool final : public grpc_event_engine::experimental::Forkable { public: explicit ThreadPool(int reserve_threads); - ~ThreadPool(); + ~ThreadPool() override; void Add(absl::AnyInvocable callback); + // Forkable + void PrepareFork() override; + void PostforkParent() override; + void PostforkChild() override; + private: class Thread { public: @@ -52,17 +59,20 @@ class ThreadPool final { }; void ThreadFunc(); + void StartNThreadsLocked(int n) ABSL_EXCLUSIVE_LOCKS_REQUIRED(&mu_); static void ReapThreads(std::vector* tlist); grpc_core::Mutex mu_; grpc_core::CondVar cv_; grpc_core::CondVar shutdown_cv_; + grpc_core::CondVar fork_cv_; bool shutdown_; std::queue> callbacks_; int reserve_threads_; int nthreads_; int threads_waiting_; std::vector dead_threads_; + bool forking_; }; } // namespace experimental diff --git a/src/core/lib/surface/init.cc b/src/core/lib/surface/init.cc index 60cad1bba78..b0b22992b46 100644 --- a/src/core/lib/surface/init.cc +++ b/src/core/lib/surface/init.cc @@ -37,6 +37,7 @@ #include "src/core/lib/channel/channel_stack_builder.h" #include "src/core/lib/config/core_configuration.h" #include "src/core/lib/debug/trace.h" +#include "src/core/lib/event_engine/forkable.h" #include "src/core/lib/gprpp/fork.h" #include "src/core/lib/gprpp/sync.h" #include "src/core/lib/gprpp/thd.h" @@ -147,6 +148,7 @@ void grpc_init(void) { g_shutting_down_cv->SignalAll(); } grpc_core::Fork::GlobalInit(); + grpc_event_engine::experimental::RegisterForkHandlers(); grpc_fork_handlers_auto_register(); grpc_core::ApplicationCallbackExecCtx::GlobalInit(); grpc_iomgr_init(); diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py index 2166350b595..a286225286e 100644 --- a/src/python/grpcio/grpc_core_dependencies.py +++ b/src/python/grpcio/grpc_core_dependencies.py @@ -447,6 +447,7 @@ CORE_SOURCE_FILES = [ 'src/core/lib/event_engine/default_event_engine.cc', 'src/core/lib/event_engine/default_event_engine_factory.cc', 'src/core/lib/event_engine/executor/threaded_executor.cc', + 'src/core/lib/event_engine/forkable.cc', 'src/core/lib/event_engine/memory_allocator.cc', 'src/core/lib/event_engine/posix_engine/posix_engine.cc', 'src/core/lib/event_engine/posix_engine/timer.cc', diff --git a/test/core/event_engine/BUILD b/test/core/event_engine/BUILD index 72bea5d5230..2b06d1432d9 100644 --- a/test/core/event_engine/BUILD +++ b/test/core/event_engine/BUILD @@ -33,6 +33,21 @@ grpc_cc_test( ], ) +grpc_cc_test( + name = "forkable_test", + srcs = ["forkable_test.cc"], + external_deps = [ + "absl/time", + "gtest", + ], + deps = [ + "//:forkable", + "//:gpr_base", + "//:gpr_platform", + "//:grpc_unsecure", + ], +) + grpc_cc_test( name = "endpoint_config_test", srcs = ["endpoint_config_test.cc"], diff --git a/test/core/event_engine/forkable_test.cc b/test/core/event_engine/forkable_test.cc new file mode 100644 index 00000000000..c832eebe0e2 --- /dev/null +++ b/test/core/event_engine/forkable_test.cc @@ -0,0 +1,103 @@ +// Copyright 2022 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#ifndef GRPC_ENABLE_FORK_SUPPORT + +// Test nothing, everything is fine +int main(int /* argc */, char** /* argv */) { return 0; } + +#else // GRPC_ENABLE_FORK_SUPPORT + +#include +#include + +#include + +#include "absl/time/clock.h" + +#include +#include + +#include "src/core/lib/event_engine/forkable.h" + +namespace { +using ::grpc_event_engine::experimental::Forkable; +using ::grpc_event_engine::experimental::RegisterForkHandlers; +} // namespace + +class ForkableTest : public testing::Test {}; + +#ifdef GRPC_POSIX_FORK_ALLOW_PTHREAD_ATFORK +TEST_F(ForkableTest, BasicPthreadAtForkOperations) { + class SomeForkable : public Forkable { + public: + void PrepareFork() override { prepare_called_ = true; } + void PostforkParent() override { parent_called_ = true; } + void PostforkChild() override { child_called_ = true; } + + void CheckParent() { + EXPECT_TRUE(prepare_called_); + EXPECT_TRUE(parent_called_); + EXPECT_FALSE(child_called_); + } + + void CheckChild() { + EXPECT_TRUE(prepare_called_); + EXPECT_FALSE(parent_called_); + EXPECT_TRUE(child_called_); + } + + private: + bool prepare_called_ = false; + bool parent_called_ = false; + bool child_called_ = false; + }; + + SomeForkable forkable; + int child_pid = fork(); + ASSERT_NE(child_pid, -1); + if (child_pid == 0) { + gpr_log(GPR_DEBUG, "I am child pid: %d", getpid()); + forkable.CheckChild(); + exit(testing::Test::HasFailure()); + } else { + gpr_log(GPR_DEBUG, "I am parent pid: %d", getpid()); + forkable.CheckParent(); + int status; + gpr_log(GPR_DEBUG, "Waiting for child pid: %d", child_pid); + do { + // retry on EINTR, and fail otherwise + if (waitpid(child_pid, &status, 0) != -1) break; + ASSERT_EQ(errno, EINTR); + } while (true); + if (WIFEXITED(status)) { + ASSERT_EQ(WEXITSTATUS(status), 0); + } else { + // exited abnormally, fail and print the exit status + ASSERT_EQ(-99, status); + } + } +} +#endif // GRPC_POSIX_FORK_ALLOW_PTHREAD_ATFORK + +int main(int argc, char** argv) { + testing::InitGoogleTest(&argc, argv); + RegisterForkHandlers(); + auto result = RUN_ALL_TESTS(); + return result; +} + +#endif // GRPC_ENABLE_FORK_SUPPORT diff --git a/tools/doxygen/Doxyfile.c++.internal b/tools/doxygen/Doxyfile.c++.internal index 2f5827ed915..e2b315d4934 100644 --- a/tools/doxygen/Doxyfile.c++.internal +++ b/tools/doxygen/Doxyfile.c++.internal @@ -1950,6 +1950,8 @@ src/core/lib/event_engine/default_event_engine_factory.h \ src/core/lib/event_engine/executor/executor.h \ src/core/lib/event_engine/executor/threaded_executor.cc \ src/core/lib/event_engine/executor/threaded_executor.h \ +src/core/lib/event_engine/forkable.cc \ +src/core/lib/event_engine/forkable.h \ src/core/lib/event_engine/handle_containers.h \ src/core/lib/event_engine/memory_allocator.cc \ src/core/lib/event_engine/poller.h \ diff --git a/tools/doxygen/Doxyfile.core.internal b/tools/doxygen/Doxyfile.core.internal index c99e329f9c8..d430986c721 100644 --- a/tools/doxygen/Doxyfile.core.internal +++ b/tools/doxygen/Doxyfile.core.internal @@ -1740,6 +1740,8 @@ src/core/lib/event_engine/default_event_engine_factory.h \ src/core/lib/event_engine/executor/executor.h \ src/core/lib/event_engine/executor/threaded_executor.cc \ src/core/lib/event_engine/executor/threaded_executor.h \ +src/core/lib/event_engine/forkable.cc \ +src/core/lib/event_engine/forkable.h \ src/core/lib/event_engine/handle_containers.h \ src/core/lib/event_engine/memory_allocator.cc \ src/core/lib/event_engine/poller.h \ diff --git a/tools/run_tests/generated/tests.json b/tools/run_tests/generated/tests.json index 5e741726b29..da1930a52f2 100644 --- a/tools/run_tests/generated/tests.json +++ b/tools/run_tests/generated/tests.json @@ -3101,6 +3101,30 @@ ], "uses_polling": false }, + { + "args": [], + "benchmark": false, + "ci_platforms": [ + "linux", + "mac", + "posix", + "windows" + ], + "cpu_cost": 1.0, + "exclude_configs": [], + "exclude_iomgrs": [], + "flaky": false, + "gtest": true, + "language": "c++", + "name": "forkable_test", + "platforms": [ + "linux", + "mac", + "posix", + "windows" + ], + "uses_polling": true + }, { "args": [], "benchmark": false,