diff --git a/BUILD b/BUILD index fe9bcff5150..02a76ce1760 100644 --- a/BUILD +++ b/BUILD @@ -2466,6 +2466,7 @@ grpc_cc_library( external_deps = [ "absl/base:core_headers", "absl/functional:any_invocable", + "absl/time", ], deps = [ "forkable", diff --git a/CMakeLists.txt b/CMakeLists.txt index d8b3274ddfc..08f05cbc498 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1201,6 +1201,7 @@ if(gRPC_BUILD_TESTS) add_dependencies(buildtests_cxx test_cpp_util_time_test) add_dependencies(buildtests_cxx thd_test) add_dependencies(buildtests_cxx thread_manager_test) + add_dependencies(buildtests_cxx thread_pool_test) add_dependencies(buildtests_cxx thread_quota_test) if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX) add_dependencies(buildtests_cxx thread_stress_test) @@ -18325,6 +18326,45 @@ target_link_libraries(thread_manager_test ) +endif() +if(gRPC_BUILD_TESTS) + +add_executable(thread_pool_test + src/core/lib/event_engine/forkable.cc + src/core/lib/event_engine/thread_pool.cc + test/core/event_engine/thread_pool_test.cc + third_party/googletest/googletest/src/gtest-all.cc + third_party/googletest/googlemock/src/gmock-all.cc +) + +target_include_directories(thread_pool_test + PRIVATE + ${CMAKE_CURRENT_SOURCE_DIR} + ${CMAKE_CURRENT_SOURCE_DIR}/include + ${_gRPC_ADDRESS_SORTING_INCLUDE_DIR} + ${_gRPC_RE2_INCLUDE_DIR} + ${_gRPC_SSL_INCLUDE_DIR} + ${_gRPC_UPB_GENERATED_DIR} + ${_gRPC_UPB_GRPC_GENERATED_DIR} + ${_gRPC_UPB_INCLUDE_DIR} + ${_gRPC_XXHASH_INCLUDE_DIR} + ${_gRPC_ZLIB_INCLUDE_DIR} + third_party/googletest/googletest/include + third_party/googletest/googletest + third_party/googletest/googlemock/include + third_party/googletest/googlemock + ${_gRPC_PROTO_GENS_DIR} +) + +target_link_libraries(thread_pool_test + ${_gRPC_PROTOBUF_LIBRARIES} + ${_gRPC_ALLTARGETS_LIBRARIES} + absl::flat_hash_set + absl::any_invocable + gpr +) + + endif() if(gRPC_BUILD_TESTS) diff --git a/build_autogenerated.yaml b/build_autogenerated.yaml index 5151e15224c..7181793f7ca 100644 --- a/build_autogenerated.yaml +++ b/build_autogenerated.yaml @@ -9981,6 +9981,21 @@ targets: deps: - grpc++_test_config - grpc++_test_util +- name: thread_pool_test + gtest: true + build: test + language: c++ + headers: + - src/core/lib/event_engine/forkable.h + - src/core/lib/event_engine/thread_pool.h + src: + - src/core/lib/event_engine/forkable.cc + - src/core/lib/event_engine/thread_pool.cc + - test/core/event_engine/thread_pool_test.cc + deps: + - absl/container:flat_hash_set + - absl/functional:any_invocable + - gpr - name: thread_quota_test gtest: true build: test diff --git a/src/core/lib/event_engine/thread_pool.cc b/src/core/lib/event_engine/thread_pool.cc index ddc6c713597..12c61ae9226 100644 --- a/src/core/lib/event_engine/thread_pool.cc +++ b/src/core/lib/event_engine/thread_pool.cc @@ -20,138 +20,147 @@ #include "src/core/lib/event_engine/thread_pool.h" +#include #include +#include "absl/time/clock.h" +#include "absl/time/time.h" + +#include + #include "src/core/lib/gprpp/thd.h" namespace grpc_event_engine { namespace experimental { -ThreadPool::Thread::Thread(ThreadPool* pool) - : pool_(pool), - thd_( - "posix_eventengine_pool", - [](void* th) { static_cast(th)->ThreadFunc(); }, - this, nullptr, grpc_core::Thread::Options().set_tracked(false)) { - thd_.Start(); +void ThreadPool::StartThread(StatePtr state) { + state->thread_count.Add(); + grpc_core::Thread( + "event_engine", + [](void* arg) { + ThreadFunc(*std::unique_ptr(static_cast(arg))); + }, + new StatePtr(state), nullptr, + grpc_core::Thread::Options().set_tracked(false).set_joinable(false)) + .Start(); } -ThreadPool::Thread::~Thread() { thd_.Join(); } - -void ThreadPool::Thread::ThreadFunc() { - pool_->ThreadFunc(); - // Now that we have killed ourselves, we should reduce the thread count - grpc_core::MutexLock lock(&pool_->mu_); - pool_->nthreads_--; - // Move ourselves to dead list - pool_->dead_threads_.push_back(this); - - if (pool_->nthreads_ == 0) { - if (pool_->forking_) pool_->fork_cv_.Signal(); - if (pool_->shutdown_) pool_->shutdown_cv_.Signal(); + +void ThreadPool::ThreadFunc(StatePtr state) { + while (state->queue.Step()) { } + state->thread_count.Remove(); } -void ThreadPool::ThreadFunc() { - for (;;) { - // Wait until work is available or we are shutting down. - grpc_core::ReleasableMutexLock lock(&mu_); - if (!forking_ && !shutdown_ && callbacks_.empty()) { - // If there are too many threads waiting, then quit this thread - if (threads_waiting_ >= reserve_threads_) { - break; - } - threads_waiting_++; - cv_.Wait(&mu_); - threads_waiting_--; - } - // a fork could be initiated while the thread was waiting - if (forking_) return; - // Drain callbacks before considering shutdown to ensure all work - // gets completed. - if (!callbacks_.empty()) { - auto cb = std::move(callbacks_.front()); - callbacks_.pop(); - lock.Release(); - cb(); - } else if (shutdown_) { +bool ThreadPool::Queue::Step() { + grpc_core::ReleasableMutexLock lock(&mu_); + // Wait until work is available or we are shutting down. + while (state_ == State::kRunning && callbacks_.empty()) { + // If there are too many threads waiting, then quit this thread. + // TODO(ctiller): wait some time in this case to be sure. + if (threads_waiting_ >= reserve_threads_) return false; + threads_waiting_++; + cv_.Wait(&mu_); + threads_waiting_--; + } + switch (state_) { + case State::kRunning: break; - } + case State::kShutdown: + case State::kForking: + if (!callbacks_.empty()) break; + return false; } + GPR_ASSERT(!callbacks_.empty()); + auto callback = std::move(callbacks_.front()); + callbacks_.pop(); + lock.Release(); + callback(); + return true; } ThreadPool::ThreadPool(int reserve_threads) - : shutdown_(false), - reserve_threads_(reserve_threads), - nthreads_(0), - threads_waiting_(0), - forking_(false) { - grpc_core::MutexLock lock(&mu_); - StartNThreadsLocked(reserve_threads_); + : reserve_threads_(reserve_threads) { + for (int i = 0; i < reserve_threads; i++) { + StartThread(state_); + } } -void ThreadPool::StartNThreadsLocked(int n) { - for (int i = 0; i < n; i++) { - nthreads_++; - new Thread(this); +ThreadPool::~ThreadPool() { state_->queue.SetShutdown(); } + +void ThreadPool::Add(absl::AnyInvocable callback) { + if (state_->queue.Add(std::move(callback))) { + StartThread(state_); } } -void ThreadPool::ReapThreads(std::vector* tlist) { - for (auto* t : *tlist) delete t; - tlist->clear(); +bool ThreadPool::Queue::Add(absl::AnyInvocable callback) { + grpc_core::MutexLock lock(&mu_); + // Add works to the callbacks list + callbacks_.push(std::move(callback)); + cv_.Signal(); + switch (state_) { + case State::kRunning: + case State::kShutdown: + return threads_waiting_ == 0; + case State::kForking: + return false; + } } -ThreadPool::~ThreadPool() { +void ThreadPool::Queue::SetState(State state) { grpc_core::MutexLock lock(&mu_); - shutdown_ = true; - cv_.SignalAll(); - while (nthreads_ != 0) { - shutdown_cv_.Wait(&mu_); + if (state == State::kRunning) { + GPR_ASSERT(state_ != State::kRunning); + } else { + GPR_ASSERT(state_ == State::kRunning); } - ReapThreads(&dead_threads_); + state_ = state; + cv_.SignalAll(); } -void ThreadPool::Add(absl::AnyInvocable callback) { +void ThreadPool::ThreadCount::Add() { grpc_core::MutexLock lock(&mu_); - // Add works to the callbacks list - callbacks_.push(std::move(callback)); - // Store the callback for later if we are forking. - // TODO(hork): should we block instead? - if (forking_) return; - // Increase pool size or notify as needed - if (threads_waiting_ == 0) { - // Kick off a new thread - nthreads_++; - new Thread(this); - } else { + ++threads_; +} + +void ThreadPool::ThreadCount::Remove() { + grpc_core::MutexLock lock(&mu_); + --threads_; + if (threads_ == 0) { cv_.Signal(); } - // Also use this chance to harvest dead threads - if (!dead_threads_.empty()) { - ReapThreads(&dead_threads_); - } } -void ThreadPool::PrepareFork() { +void ThreadPool::ThreadCount::Quiesce() { grpc_core::MutexLock lock(&mu_); - forking_ = true; - cv_.SignalAll(); - while (nthreads_ != 0) { - fork_cv_.Wait(&mu_); + auto last_log = absl::Now(); + while (threads_ > 0) { + // Wait for all threads to exit. + // At least once every three seconds (but no faster than once per second in + // the event of spurious wakeups) log a message indicating we're waiting to + // fork. + cv_.WaitWithTimeout(&mu_, absl::Seconds(3)); + if (threads_ > 0 && absl::Now() - last_log > absl::Seconds(1)) { + gpr_log(GPR_ERROR, "Waiting for thread pool to idle before forking"); + last_log = absl::Now(); + } } - ReapThreads(&dead_threads_); } -void ThreadPool::PostforkParent() { - grpc_core::MutexLock lock(&mu_); - forking_ = false; - StartNThreadsLocked(reserve_threads_); +void ThreadPool::PrepareFork() { + state_->queue.SetForking(); + state_->thread_count.Quiesce(); } -void ThreadPool::PostforkChild() { - grpc_core::MutexLock lock(&mu_); - forking_ = false; - StartNThreadsLocked(reserve_threads_); +void ThreadPool::PostforkParent() { Postfork(); } + +void ThreadPool::PostforkChild() { Postfork(); } + +void ThreadPool::Postfork() { + state_->queue.Reset(); + for (int i = 0; i < reserve_threads_; i++) { + StartThread(state_); + } } } // namespace experimental diff --git a/src/core/lib/event_engine/thread_pool.h b/src/core/lib/event_engine/thread_pool.h index 6613279cc5c..dad1f282922 100644 --- a/src/core/lib/event_engine/thread_pool.h +++ b/src/core/lib/event_engine/thread_pool.h @@ -21,15 +21,14 @@ #include +#include #include -#include #include "absl/base/thread_annotations.h" #include "absl/functional/any_invocable.h" #include "src/core/lib/event_engine/forkable.h" #include "src/core/lib/gprpp/sync.h" -#include "src/core/lib/gprpp/thd.h" namespace grpc_event_engine { namespace experimental { @@ -47,32 +46,57 @@ class ThreadPool final : public grpc_event_engine::experimental::Forkable { void PostforkChild() override; private: - class Thread { + class Queue { public: - explicit Thread(ThreadPool* pool); - ~Thread(); + explicit Queue(int reserve_threads) : reserve_threads_(reserve_threads) {} + bool Step(); + void SetShutdown() { SetState(State::kShutdown); } + void SetForking() { SetState(State::kForking); } + // Add a callback to the queue. + // Return true if we should also spin up a new thread. + bool Add(absl::AnyInvocable callback); + void Reset() { SetState(State::kRunning); } private: - ThreadPool* pool_; - grpc_core::Thread thd_; - void ThreadFunc(); + enum class State { kRunning, kShutdown, kForking }; + + void SetState(State state); + + grpc_core::Mutex mu_; + grpc_core::CondVar cv_; + std::queue> callbacks_ ABSL_GUARDED_BY(mu_); + int threads_waiting_ ABSL_GUARDED_BY(mu_) = 0; + const int reserve_threads_; + State state_ ABSL_GUARDED_BY(mu_) = State::kRunning; }; - void ThreadFunc(); - void StartNThreadsLocked(int n) ABSL_EXCLUSIVE_LOCKS_REQUIRED(&mu_); - static void ReapThreads(std::vector* tlist); - - grpc_core::Mutex mu_; - grpc_core::CondVar cv_; - grpc_core::CondVar shutdown_cv_; - grpc_core::CondVar fork_cv_; - bool shutdown_; - std::queue> callbacks_; - int reserve_threads_; - int nthreads_; - int threads_waiting_; - std::vector dead_threads_; - bool forking_; + class ThreadCount { + public: + void Add(); + void Remove(); + // Block until all threads have stopped. + void Quiesce(); + + private: + grpc_core::Mutex mu_; + grpc_core::CondVar cv_; + int threads_ ABSL_GUARDED_BY(mu_) = 0; + }; + + struct State { + explicit State(int reserve_threads) : queue(reserve_threads) {} + Queue queue; + ThreadCount thread_count; + }; + + using StatePtr = std::shared_ptr; + + static void ThreadFunc(StatePtr state); + static void StartThread(StatePtr state); + void Postfork(); + + const int reserve_threads_; + const StatePtr state_ = std::make_shared(reserve_threads_); }; } // namespace experimental diff --git a/test/core/event_engine/BUILD b/test/core/event_engine/BUILD index dea38abe6bc..53ac8b38218 100644 --- a/test/core/event_engine/BUILD +++ b/test/core/event_engine/BUILD @@ -48,6 +48,19 @@ grpc_cc_test( ], ) +grpc_cc_test( + name = "thread_pool_test", + srcs = ["thread_pool_test.cc"], + external_deps = [ + "absl/synchronization", + "gtest", + ], + deps = [ + "//:event_engine_thread_pool", + "//:gpr", + ], +) + grpc_cc_test( name = "endpoint_config_test", srcs = ["endpoint_config_test.cc"], diff --git a/test/core/event_engine/thread_pool_test.cc b/test/core/event_engine/thread_pool_test.cc new file mode 100644 index 00000000000..8af63bcc193 --- /dev/null +++ b/test/core/event_engine/thread_pool_test.cc @@ -0,0 +1,98 @@ +// Copyright 2022 The gRPC Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "src/core/lib/event_engine/thread_pool.h" + +#include +#include +#include + +#include + +#include "absl/synchronization/notification.h" +#include "gtest/gtest.h" + +#include + +namespace grpc_event_engine { +namespace experimental { + +TEST(ThreadPoolTest, CanRunClosure) { + ThreadPool p(1); + absl::Notification n; + p.Add([&n] { n.Notify(); }); + n.WaitForNotification(); +} + +TEST(ThreadPoolTest, CanDestroyInsideClosure) { + auto p = std::make_shared(1); + p->Add([p]() { std::this_thread::sleep_for(std::chrono::seconds(1)); }); +} + +TEST(ThreadPoolTest, CanSurviveFork) { + ThreadPool p(1); + absl::Notification n; + gpr_log(GPR_INFO, "add callback 1"); + p.Add([&n, &p] { + std::this_thread::sleep_for(std::chrono::seconds(1)); + gpr_log(GPR_INFO, "add callback 2"); + p.Add([&n] { + std::this_thread::sleep_for(std::chrono::seconds(1)); + gpr_log(GPR_INFO, "notify"); + n.Notify(); + }); + }); + gpr_log(GPR_INFO, "prepare fork"); + p.PrepareFork(); + gpr_log(GPR_INFO, "wait for notification"); + n.WaitForNotification(); + gpr_log(GPR_INFO, "postfork child"); + p.PostforkChild(); + absl::Notification n2; + gpr_log(GPR_INFO, "add callback 3"); + p.Add([&n2] { + gpr_log(GPR_INFO, "notify"); + n2.Notify(); + }); + gpr_log(GPR_INFO, "wait for notification"); + n2.WaitForNotification(); +} + +void ScheduleSelf(ThreadPool* p) { + p->Add([p] { ScheduleSelf(p); }); +} + +TEST(ThreadPoolDeathTest, CanDetectStucknessAtFork) { + ASSERT_DEATH_IF_SUPPORTED( + [] { + gpr_set_log_verbosity(GPR_LOG_SEVERITY_ERROR); + ThreadPool p(1); + ScheduleSelf(&p); + std::thread terminator([] { + std::this_thread::sleep_for(std::chrono::seconds(10)); + abort(); + }); + p.PrepareFork(); + }(), + "Waiting for thread pool to idle before forking"); +} + +} // namespace experimental +} // namespace grpc_event_engine + +int main(int argc, char** argv) { + gpr_log_verbosity_init(); + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/tools/run_tests/generated/tests.json b/tools/run_tests/generated/tests.json index 57f30f9ed27..0a0ac9971b8 100644 --- a/tools/run_tests/generated/tests.json +++ b/tools/run_tests/generated/tests.json @@ -7205,6 +7205,30 @@ ], "uses_polling": true }, + { + "args": [], + "benchmark": false, + "ci_platforms": [ + "linux", + "mac", + "posix", + "windows" + ], + "cpu_cost": 1.0, + "exclude_configs": [], + "exclude_iomgrs": [], + "flaky": false, + "gtest": true, + "language": "c++", + "name": "thread_pool_test", + "platforms": [ + "linux", + "mac", + "posix", + "windows" + ], + "uses_polling": true + }, { "args": [], "benchmark": false,