diff --git a/BUILD b/BUILD index 96342999d8a..3c9dec3b245 100644 --- a/BUILD +++ b/BUILD @@ -2466,7 +2466,6 @@ grpc_cc_library( external_deps = [ "absl/base:core_headers", "absl/functional:any_invocable", - "absl/time", ], deps = [ "forkable", diff --git a/CMakeLists.txt b/CMakeLists.txt index ccda5fbbef7..a478579336b 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1200,7 +1200,6 @@ if(gRPC_BUILD_TESTS) add_dependencies(buildtests_cxx test_cpp_util_time_test) add_dependencies(buildtests_cxx thd_test) add_dependencies(buildtests_cxx thread_manager_test) - add_dependencies(buildtests_cxx thread_pool_test) add_dependencies(buildtests_cxx thread_quota_test) if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX) add_dependencies(buildtests_cxx thread_stress_test) @@ -18299,45 +18298,6 @@ target_link_libraries(thread_manager_test ) -endif() -if(gRPC_BUILD_TESTS) - -add_executable(thread_pool_test - src/core/lib/event_engine/forkable.cc - src/core/lib/event_engine/thread_pool.cc - test/core/event_engine/thread_pool_test.cc - third_party/googletest/googletest/src/gtest-all.cc - third_party/googletest/googlemock/src/gmock-all.cc -) - -target_include_directories(thread_pool_test - PRIVATE - ${CMAKE_CURRENT_SOURCE_DIR} - ${CMAKE_CURRENT_SOURCE_DIR}/include - ${_gRPC_ADDRESS_SORTING_INCLUDE_DIR} - ${_gRPC_RE2_INCLUDE_DIR} - ${_gRPC_SSL_INCLUDE_DIR} - ${_gRPC_UPB_GENERATED_DIR} - ${_gRPC_UPB_GRPC_GENERATED_DIR} - ${_gRPC_UPB_INCLUDE_DIR} - ${_gRPC_XXHASH_INCLUDE_DIR} - ${_gRPC_ZLIB_INCLUDE_DIR} - third_party/googletest/googletest/include - third_party/googletest/googletest - third_party/googletest/googlemock/include - third_party/googletest/googlemock - ${_gRPC_PROTO_GENS_DIR} -) - -target_link_libraries(thread_pool_test - ${_gRPC_PROTOBUF_LIBRARIES} - ${_gRPC_ALLTARGETS_LIBRARIES} - absl::flat_hash_set - absl::any_invocable - gpr -) - - endif() if(gRPC_BUILD_TESTS) diff --git a/build_autogenerated.yaml b/build_autogenerated.yaml index 5d6ef262ef1..389e017985f 100644 --- a/build_autogenerated.yaml +++ b/build_autogenerated.yaml @@ -9985,21 +9985,6 @@ targets: deps: - grpc++_test_config - grpc++_test_util -- name: thread_pool_test - gtest: true - build: test - language: c++ - headers: - - src/core/lib/event_engine/forkable.h - - src/core/lib/event_engine/thread_pool.h - src: - - src/core/lib/event_engine/forkable.cc - - src/core/lib/event_engine/thread_pool.cc - - test/core/event_engine/thread_pool_test.cc - deps: - - absl/container:flat_hash_set - - absl/functional:any_invocable - - gpr - name: thread_quota_test gtest: true build: test diff --git a/src/core/lib/event_engine/thread_pool.cc b/src/core/lib/event_engine/thread_pool.cc index 12c61ae9226..ddc6c713597 100644 --- a/src/core/lib/event_engine/thread_pool.cc +++ b/src/core/lib/event_engine/thread_pool.cc @@ -20,147 +20,138 @@ #include "src/core/lib/event_engine/thread_pool.h" -#include <memory> #include <utility> -#include "absl/time/clock.h" -#include "absl/time/time.h" - -#include <grpc/support/log.h> - #include "src/core/lib/gprpp/thd.h" namespace grpc_event_engine { namespace experimental { -void ThreadPool::StartThread(StatePtr state) { - state->thread_count.Add(); - grpc_core::Thread( - "event_engine", - [](void* arg) { - ThreadFunc(*std::unique_ptr<StatePtr>(static_cast<StatePtr*>(arg))); - }, - new StatePtr(state), nullptr, - grpc_core::Thread::Options().set_tracked(false).set_joinable(false)) - .Start(); +ThreadPool::Thread::Thread(ThreadPool* pool) + : pool_(pool), + thd_( + "posix_eventengine_pool", + [](void* th) { static_cast<ThreadPool::Thread*>(th)->ThreadFunc(); }, + this, nullptr, grpc_core::Thread::Options().set_tracked(false)) { + thd_.Start(); } - -void ThreadPool::ThreadFunc(StatePtr state) { - while (state->queue.Step()) { +ThreadPool::Thread::~Thread() { thd_.Join(); } + +void ThreadPool::Thread::ThreadFunc() { + pool_->ThreadFunc(); + // Now that we have killed ourselves, we should reduce the thread count + grpc_core::MutexLock lock(&pool_->mu_); + pool_->nthreads_--; + // Move ourselves to dead list + pool_->dead_threads_.push_back(this); + + if (pool_->nthreads_ == 0) { + if (pool_->forking_) pool_->fork_cv_.Signal(); + if (pool_->shutdown_) pool_->shutdown_cv_.Signal(); } - state->thread_count.Remove(); } -bool ThreadPool::Queue::Step() { - grpc_core::ReleasableMutexLock lock(&mu_); - // Wait until work is available or we are shutting down. - while (state_ == State::kRunning && callbacks_.empty()) { - // If there are too many threads waiting, then quit this thread. - // TODO(ctiller): wait some time in this case to be sure. - if (threads_waiting_ >= reserve_threads_) return false; - threads_waiting_++; - cv_.Wait(&mu_); - threads_waiting_--; - } - switch (state_) { - case State::kRunning: +void ThreadPool::ThreadFunc() { + for (;;) { + // Wait until work is available or we are shutting down. + grpc_core::ReleasableMutexLock lock(&mu_); + if (!forking_ && !shutdown_ && callbacks_.empty()) { + // If there are too many threads waiting, then quit this thread + if (threads_waiting_ >= reserve_threads_) { + break; + } + threads_waiting_++; + cv_.Wait(&mu_); + threads_waiting_--; + } + // a fork could be initiated while the thread was waiting + if (forking_) return; + // Drain callbacks before considering shutdown to ensure all work + // gets completed. + if (!callbacks_.empty()) { + auto cb = std::move(callbacks_.front()); + callbacks_.pop(); + lock.Release(); + cb(); + } else if (shutdown_) { break; - case State::kShutdown: - case State::kForking: - if (!callbacks_.empty()) break; - return false; + } } - GPR_ASSERT(!callbacks_.empty()); - auto callback = std::move(callbacks_.front()); - callbacks_.pop(); - lock.Release(); - callback(); - return true; } ThreadPool::ThreadPool(int reserve_threads) - : reserve_threads_(reserve_threads) { - for (int i = 0; i < reserve_threads; i++) { - StartThread(state_); - } + : shutdown_(false), + reserve_threads_(reserve_threads), + nthreads_(0), + threads_waiting_(0), + forking_(false) { + grpc_core::MutexLock lock(&mu_); + StartNThreadsLocked(reserve_threads_); } -ThreadPool::~ThreadPool() { state_->queue.SetShutdown(); } - -void ThreadPool::Add(absl::AnyInvocable<void()> callback) { - if (state_->queue.Add(std::move(callback))) { - StartThread(state_); +void ThreadPool::StartNThreadsLocked(int n) { + for (int i = 0; i < n; i++) { + nthreads_++; + new Thread(this); } } -bool ThreadPool::Queue::Add(absl::AnyInvocable<void()> callback) { - grpc_core::MutexLock lock(&mu_); - // Add works to the callbacks list - callbacks_.push(std::move(callback)); - cv_.Signal(); - switch (state_) { - case State::kRunning: - case State::kShutdown: - return threads_waiting_ == 0; - case State::kForking: - return false; - } +void ThreadPool::ReapThreads(std::vector<Thread*>* tlist) { + for (auto* t : *tlist) delete t; + tlist->clear(); } -void ThreadPool::Queue::SetState(State state) { +ThreadPool::~ThreadPool() { grpc_core::MutexLock lock(&mu_); - if (state == State::kRunning) { - GPR_ASSERT(state_ != State::kRunning); - } else { - GPR_ASSERT(state_ == State::kRunning); - } - state_ = state; + shutdown_ = true; cv_.SignalAll(); + while (nthreads_ != 0) { + shutdown_cv_.Wait(&mu_); + } + ReapThreads(&dead_threads_); } -void ThreadPool::ThreadCount::Add() { - grpc_core::MutexLock lock(&mu_); - ++threads_; -} - -void ThreadPool::ThreadCount::Remove() { +void ThreadPool::Add(absl::AnyInvocable<void()> callback) { grpc_core::MutexLock lock(&mu_); - --threads_; - if (threads_ == 0) { + // Add works to the callbacks list + callbacks_.push(std::move(callback)); + // Store the callback for later if we are forking. + // TODO(hork): should we block instead? + if (forking_) return; + // Increase pool size or notify as needed + if (threads_waiting_ == 0) { + // Kick off a new thread + nthreads_++; + new Thread(this); + } else { cv_.Signal(); } + // Also use this chance to harvest dead threads + if (!dead_threads_.empty()) { + ReapThreads(&dead_threads_); + } } -void ThreadPool::ThreadCount::Quiesce() { +void ThreadPool::PrepareFork() { grpc_core::MutexLock lock(&mu_); - auto last_log = absl::Now(); - while (threads_ > 0) { - // Wait for all threads to exit. - // At least once every three seconds (but no faster than once per second in - // the event of spurious wakeups) log a message indicating we're waiting to - // fork. - cv_.WaitWithTimeout(&mu_, absl::Seconds(3)); - if (threads_ > 0 && absl::Now() - last_log > absl::Seconds(1)) { - gpr_log(GPR_ERROR, "Waiting for thread pool to idle before forking"); - last_log = absl::Now(); - } + forking_ = true; + cv_.SignalAll(); + while (nthreads_ != 0) { + fork_cv_.Wait(&mu_); } + ReapThreads(&dead_threads_); } -void ThreadPool::PrepareFork() { - state_->queue.SetForking(); - state_->thread_count.Quiesce(); +void ThreadPool::PostforkParent() { + grpc_core::MutexLock lock(&mu_); + forking_ = false; + StartNThreadsLocked(reserve_threads_); } -void ThreadPool::PostforkParent() { Postfork(); } - -void ThreadPool::PostforkChild() { Postfork(); } - -void ThreadPool::Postfork() { - state_->queue.Reset(); - for (int i = 0; i < reserve_threads_; i++) { - StartThread(state_); - } +void ThreadPool::PostforkChild() { + grpc_core::MutexLock lock(&mu_); + forking_ = false; + StartNThreadsLocked(reserve_threads_); } } // namespace experimental diff --git a/src/core/lib/event_engine/thread_pool.h b/src/core/lib/event_engine/thread_pool.h index dad1f282922..6613279cc5c 100644 --- a/src/core/lib/event_engine/thread_pool.h +++ b/src/core/lib/event_engine/thread_pool.h @@ -21,14 +21,15 @@ #include <grpc/support/port_platform.h> -#include <memory> #include <queue> +#include <vector> #include "absl/base/thread_annotations.h" #include "absl/functional/any_invocable.h" #include "src/core/lib/event_engine/forkable.h" #include "src/core/lib/gprpp/sync.h" +#include "src/core/lib/gprpp/thd.h" namespace grpc_event_engine { namespace experimental { @@ -46,57 +47,32 @@ class ThreadPool final : public grpc_event_engine::experimental::Forkable { void PostforkChild() override; private: - class Queue { + class Thread { public: - explicit Queue(int reserve_threads) : reserve_threads_(reserve_threads) {} - bool Step(); - void SetShutdown() { SetState(State::kShutdown); } - void SetForking() { SetState(State::kForking); } - // Add a callback to the queue. - // Return true if we should also spin up a new thread. - bool Add(absl::AnyInvocable<void()> callback); - void Reset() { SetState(State::kRunning); } + explicit Thread(ThreadPool* pool); + ~Thread(); private: - enum class State { kRunning, kShutdown, kForking }; - - void SetState(State state); - - grpc_core::Mutex mu_; - grpc_core::CondVar cv_; - std::queue<absl::AnyInvocable<void()>> callbacks_ ABSL_GUARDED_BY(mu_); - int threads_waiting_ ABSL_GUARDED_BY(mu_) = 0; - const int reserve_threads_; - State state_ ABSL_GUARDED_BY(mu_) = State::kRunning; + ThreadPool* pool_; + grpc_core::Thread thd_; + void ThreadFunc(); }; - class ThreadCount { - public: - void Add(); - void Remove(); - // Block until all threads have stopped. - void Quiesce(); - - private: - grpc_core::Mutex mu_; - grpc_core::CondVar cv_; - int threads_ ABSL_GUARDED_BY(mu_) = 0; - }; - - struct State { - explicit State(int reserve_threads) : queue(reserve_threads) {} - Queue queue; - ThreadCount thread_count; - }; - - using StatePtr = std::shared_ptr<State>; - - static void ThreadFunc(StatePtr state); - static void StartThread(StatePtr state); - void Postfork(); - - const int reserve_threads_; - const StatePtr state_ = std::make_shared<State>(reserve_threads_); + void ThreadFunc(); + void StartNThreadsLocked(int n) ABSL_EXCLUSIVE_LOCKS_REQUIRED(&mu_); + static void ReapThreads(std::vector<Thread*>* tlist); + + grpc_core::Mutex mu_; + grpc_core::CondVar cv_; + grpc_core::CondVar shutdown_cv_; + grpc_core::CondVar fork_cv_; + bool shutdown_; + std::queue<absl::AnyInvocable<void()>> callbacks_; + int reserve_threads_; + int nthreads_; + int threads_waiting_; + std::vector<Thread*> dead_threads_; + bool forking_; }; } // namespace experimental diff --git a/test/core/event_engine/BUILD b/test/core/event_engine/BUILD index 53ac8b38218..dea38abe6bc 100644 --- a/test/core/event_engine/BUILD +++ b/test/core/event_engine/BUILD @@ -48,19 +48,6 @@ grpc_cc_test( ], ) -grpc_cc_test( - name = "thread_pool_test", - srcs = ["thread_pool_test.cc"], - external_deps = [ - "absl/synchronization", - "gtest", - ], - deps = [ - "//:event_engine_thread_pool", - "//:gpr", - ], -) - grpc_cc_test( name = "endpoint_config_test", srcs = ["endpoint_config_test.cc"], diff --git a/test/core/event_engine/thread_pool_test.cc b/test/core/event_engine/thread_pool_test.cc deleted file mode 100644 index 8af63bcc193..00000000000 --- a/test/core/event_engine/thread_pool_test.cc +++ /dev/null @@ -1,98 +0,0 @@ -// Copyright 2022 The gRPC Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include "src/core/lib/event_engine/thread_pool.h" - -#include <atomic> -#include <chrono> -#include <thread> - -#include <gtest/gtest.h> - -#include "absl/synchronization/notification.h" -#include "gtest/gtest.h" - -#include <grpc/support/log.h> - -namespace grpc_event_engine { -namespace experimental { - -TEST(ThreadPoolTest, CanRunClosure) { - ThreadPool p(1); - absl::Notification n; - p.Add([&n] { n.Notify(); }); - n.WaitForNotification(); -} - -TEST(ThreadPoolTest, CanDestroyInsideClosure) { - auto p = std::make_shared<ThreadPool>(1); - p->Add([p]() { std::this_thread::sleep_for(std::chrono::seconds(1)); }); -} - -TEST(ThreadPoolTest, CanSurviveFork) { - ThreadPool p(1); - absl::Notification n; - gpr_log(GPR_INFO, "add callback 1"); - p.Add([&n, &p] { - std::this_thread::sleep_for(std::chrono::seconds(1)); - gpr_log(GPR_INFO, "add callback 2"); - p.Add([&n] { - std::this_thread::sleep_for(std::chrono::seconds(1)); - gpr_log(GPR_INFO, "notify"); - n.Notify(); - }); - }); - gpr_log(GPR_INFO, "prepare fork"); - p.PrepareFork(); - gpr_log(GPR_INFO, "wait for notification"); - n.WaitForNotification(); - gpr_log(GPR_INFO, "postfork child"); - p.PostforkChild(); - absl::Notification n2; - gpr_log(GPR_INFO, "add callback 3"); - p.Add([&n2] { - gpr_log(GPR_INFO, "notify"); - n2.Notify(); - }); - gpr_log(GPR_INFO, "wait for notification"); - n2.WaitForNotification(); -} - -void ScheduleSelf(ThreadPool* p) { - p->Add([p] { ScheduleSelf(p); }); -} - -TEST(ThreadPoolDeathTest, CanDetectStucknessAtFork) { - ASSERT_DEATH_IF_SUPPORTED( - [] { - gpr_set_log_verbosity(GPR_LOG_SEVERITY_ERROR); - ThreadPool p(1); - ScheduleSelf(&p); - std::thread terminator([] { - std::this_thread::sleep_for(std::chrono::seconds(10)); - abort(); - }); - p.PrepareFork(); - }(), - "Waiting for thread pool to idle before forking"); -} - -} // namespace experimental -} // namespace grpc_event_engine - -int main(int argc, char** argv) { - gpr_log_verbosity_init(); - ::testing::InitGoogleTest(&argc, argv); - return RUN_ALL_TESTS(); -} diff --git a/tools/run_tests/generated/tests.json b/tools/run_tests/generated/tests.json index 0a0ac9971b8..57f30f9ed27 100644 --- a/tools/run_tests/generated/tests.json +++ b/tools/run_tests/generated/tests.json @@ -7205,30 +7205,6 @@ ], "uses_polling": true }, - { - "args": [], - "benchmark": false, - "ci_platforms": [ - "linux", - "mac", - "posix", - "windows" - ], - "cpu_cost": 1.0, - "exclude_configs": [], - "exclude_iomgrs": [], - "flaky": false, - "gtest": true, - "language": "c++", - "name": "thread_pool_test", - "platforms": [ - "linux", - "mac", - "posix", - "windows" - ], - "uses_polling": true - }, { "args": [], "benchmark": false,