[event_engine] Thread pool that can handle deletion in a callback (#30763)

* [event_engine] Thread pool that can handle deletion in a callback

* missed file

* Automated change: Fix sanity tests

* simplify memory model

* review feedback

* Automated change: Fix sanity tests

* detect-stuckness

* fix

* Automated change: Fix sanity tests

Co-authored-by: ctiller <ctiller@users.noreply.github.com>
pull/30964/head
Craig Tiller 2 years ago committed by GitHub
parent cb3d7a9e9e
commit f6e1cf1dc0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      BUILD
  2. 40
      CMakeLists.txt
  3. 15
      build_autogenerated.yaml
  4. 197
      src/core/lib/event_engine/thread_pool.cc
  5. 70
      src/core/lib/event_engine/thread_pool.h
  6. 13
      test/core/event_engine/BUILD
  7. 98
      test/core/event_engine/thread_pool_test.cc
  8. 24
      tools/run_tests/generated/tests.json

@ -2466,6 +2466,7 @@ grpc_cc_library(
external_deps = [
"absl/base:core_headers",
"absl/functional:any_invocable",
"absl/time",
],
deps = [
"forkable",

40
CMakeLists.txt generated

@ -1201,6 +1201,7 @@ if(gRPC_BUILD_TESTS)
add_dependencies(buildtests_cxx test_cpp_util_time_test)
add_dependencies(buildtests_cxx thd_test)
add_dependencies(buildtests_cxx thread_manager_test)
add_dependencies(buildtests_cxx thread_pool_test)
add_dependencies(buildtests_cxx thread_quota_test)
if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
add_dependencies(buildtests_cxx thread_stress_test)
@ -18325,6 +18326,45 @@ target_link_libraries(thread_manager_test
)
endif()
if(gRPC_BUILD_TESTS)
add_executable(thread_pool_test
src/core/lib/event_engine/forkable.cc
src/core/lib/event_engine/thread_pool.cc
test/core/event_engine/thread_pool_test.cc
third_party/googletest/googletest/src/gtest-all.cc
third_party/googletest/googlemock/src/gmock-all.cc
)
target_include_directories(thread_pool_test
PRIVATE
${CMAKE_CURRENT_SOURCE_DIR}
${CMAKE_CURRENT_SOURCE_DIR}/include
${_gRPC_ADDRESS_SORTING_INCLUDE_DIR}
${_gRPC_RE2_INCLUDE_DIR}
${_gRPC_SSL_INCLUDE_DIR}
${_gRPC_UPB_GENERATED_DIR}
${_gRPC_UPB_GRPC_GENERATED_DIR}
${_gRPC_UPB_INCLUDE_DIR}
${_gRPC_XXHASH_INCLUDE_DIR}
${_gRPC_ZLIB_INCLUDE_DIR}
third_party/googletest/googletest/include
third_party/googletest/googletest
third_party/googletest/googlemock/include
third_party/googletest/googlemock
${_gRPC_PROTO_GENS_DIR}
)
target_link_libraries(thread_pool_test
${_gRPC_PROTOBUF_LIBRARIES}
${_gRPC_ALLTARGETS_LIBRARIES}
absl::flat_hash_set
absl::any_invocable
gpr
)
endif()
if(gRPC_BUILD_TESTS)

@ -9981,6 +9981,21 @@ targets:
deps:
- grpc++_test_config
- grpc++_test_util
- name: thread_pool_test
gtest: true
build: test
language: c++
headers:
- src/core/lib/event_engine/forkable.h
- src/core/lib/event_engine/thread_pool.h
src:
- src/core/lib/event_engine/forkable.cc
- src/core/lib/event_engine/thread_pool.cc
- test/core/event_engine/thread_pool_test.cc
deps:
- absl/container:flat_hash_set
- absl/functional:any_invocable
- gpr
- name: thread_quota_test
gtest: true
build: test

@ -20,138 +20,147 @@
#include "src/core/lib/event_engine/thread_pool.h"
#include <memory>
#include <utility>
#include "absl/time/clock.h"
#include "absl/time/time.h"
#include <grpc/support/log.h>
#include "src/core/lib/gprpp/thd.h"
namespace grpc_event_engine {
namespace experimental {
ThreadPool::Thread::Thread(ThreadPool* pool)
: pool_(pool),
thd_(
"posix_eventengine_pool",
[](void* th) { static_cast<ThreadPool::Thread*>(th)->ThreadFunc(); },
this, nullptr, grpc_core::Thread::Options().set_tracked(false)) {
thd_.Start();
void ThreadPool::StartThread(StatePtr state) {
state->thread_count.Add();
grpc_core::Thread(
"event_engine",
[](void* arg) {
ThreadFunc(*std::unique_ptr<StatePtr>(static_cast<StatePtr*>(arg)));
},
new StatePtr(state), nullptr,
grpc_core::Thread::Options().set_tracked(false).set_joinable(false))
.Start();
}
ThreadPool::Thread::~Thread() { thd_.Join(); }
void ThreadPool::Thread::ThreadFunc() {
pool_->ThreadFunc();
// Now that we have killed ourselves, we should reduce the thread count
grpc_core::MutexLock lock(&pool_->mu_);
pool_->nthreads_--;
// Move ourselves to dead list
pool_->dead_threads_.push_back(this);
if (pool_->nthreads_ == 0) {
if (pool_->forking_) pool_->fork_cv_.Signal();
if (pool_->shutdown_) pool_->shutdown_cv_.Signal();
void ThreadPool::ThreadFunc(StatePtr state) {
while (state->queue.Step()) {
}
state->thread_count.Remove();
}
void ThreadPool::ThreadFunc() {
for (;;) {
// Wait until work is available or we are shutting down.
grpc_core::ReleasableMutexLock lock(&mu_);
if (!forking_ && !shutdown_ && callbacks_.empty()) {
// If there are too many threads waiting, then quit this thread
if (threads_waiting_ >= reserve_threads_) {
break;
}
threads_waiting_++;
cv_.Wait(&mu_);
threads_waiting_--;
}
// a fork could be initiated while the thread was waiting
if (forking_) return;
// Drain callbacks before considering shutdown to ensure all work
// gets completed.
if (!callbacks_.empty()) {
auto cb = std::move(callbacks_.front());
callbacks_.pop();
lock.Release();
cb();
} else if (shutdown_) {
bool ThreadPool::Queue::Step() {
grpc_core::ReleasableMutexLock lock(&mu_);
// Wait until work is available or we are shutting down.
while (state_ == State::kRunning && callbacks_.empty()) {
// If there are too many threads waiting, then quit this thread.
// TODO(ctiller): wait some time in this case to be sure.
if (threads_waiting_ >= reserve_threads_) return false;
threads_waiting_++;
cv_.Wait(&mu_);
threads_waiting_--;
}
switch (state_) {
case State::kRunning:
break;
}
case State::kShutdown:
case State::kForking:
if (!callbacks_.empty()) break;
return false;
}
GPR_ASSERT(!callbacks_.empty());
auto callback = std::move(callbacks_.front());
callbacks_.pop();
lock.Release();
callback();
return true;
}
ThreadPool::ThreadPool(int reserve_threads)
: shutdown_(false),
reserve_threads_(reserve_threads),
nthreads_(0),
threads_waiting_(0),
forking_(false) {
grpc_core::MutexLock lock(&mu_);
StartNThreadsLocked(reserve_threads_);
: reserve_threads_(reserve_threads) {
for (int i = 0; i < reserve_threads; i++) {
StartThread(state_);
}
}
void ThreadPool::StartNThreadsLocked(int n) {
for (int i = 0; i < n; i++) {
nthreads_++;
new Thread(this);
ThreadPool::~ThreadPool() { state_->queue.SetShutdown(); }
void ThreadPool::Add(absl::AnyInvocable<void()> callback) {
if (state_->queue.Add(std::move(callback))) {
StartThread(state_);
}
}
void ThreadPool::ReapThreads(std::vector<Thread*>* tlist) {
for (auto* t : *tlist) delete t;
tlist->clear();
bool ThreadPool::Queue::Add(absl::AnyInvocable<void()> callback) {
grpc_core::MutexLock lock(&mu_);
// Add works to the callbacks list
callbacks_.push(std::move(callback));
cv_.Signal();
switch (state_) {
case State::kRunning:
case State::kShutdown:
return threads_waiting_ == 0;
case State::kForking:
return false;
}
}
ThreadPool::~ThreadPool() {
void ThreadPool::Queue::SetState(State state) {
grpc_core::MutexLock lock(&mu_);
shutdown_ = true;
cv_.SignalAll();
while (nthreads_ != 0) {
shutdown_cv_.Wait(&mu_);
if (state == State::kRunning) {
GPR_ASSERT(state_ != State::kRunning);
} else {
GPR_ASSERT(state_ == State::kRunning);
}
ReapThreads(&dead_threads_);
state_ = state;
cv_.SignalAll();
}
void ThreadPool::Add(absl::AnyInvocable<void()> callback) {
void ThreadPool::ThreadCount::Add() {
grpc_core::MutexLock lock(&mu_);
// Add works to the callbacks list
callbacks_.push(std::move(callback));
// Store the callback for later if we are forking.
// TODO(hork): should we block instead?
if (forking_) return;
// Increase pool size or notify as needed
if (threads_waiting_ == 0) {
// Kick off a new thread
nthreads_++;
new Thread(this);
} else {
++threads_;
}
void ThreadPool::ThreadCount::Remove() {
grpc_core::MutexLock lock(&mu_);
--threads_;
if (threads_ == 0) {
cv_.Signal();
}
// Also use this chance to harvest dead threads
if (!dead_threads_.empty()) {
ReapThreads(&dead_threads_);
}
}
void ThreadPool::PrepareFork() {
void ThreadPool::ThreadCount::Quiesce() {
grpc_core::MutexLock lock(&mu_);
forking_ = true;
cv_.SignalAll();
while (nthreads_ != 0) {
fork_cv_.Wait(&mu_);
auto last_log = absl::Now();
while (threads_ > 0) {
// Wait for all threads to exit.
// At least once every three seconds (but no faster than once per second in
// the event of spurious wakeups) log a message indicating we're waiting to
// fork.
cv_.WaitWithTimeout(&mu_, absl::Seconds(3));
if (threads_ > 0 && absl::Now() - last_log > absl::Seconds(1)) {
gpr_log(GPR_ERROR, "Waiting for thread pool to idle before forking");
last_log = absl::Now();
}
}
ReapThreads(&dead_threads_);
}
void ThreadPool::PostforkParent() {
grpc_core::MutexLock lock(&mu_);
forking_ = false;
StartNThreadsLocked(reserve_threads_);
void ThreadPool::PrepareFork() {
state_->queue.SetForking();
state_->thread_count.Quiesce();
}
void ThreadPool::PostforkChild() {
grpc_core::MutexLock lock(&mu_);
forking_ = false;
StartNThreadsLocked(reserve_threads_);
void ThreadPool::PostforkParent() { Postfork(); }
void ThreadPool::PostforkChild() { Postfork(); }
void ThreadPool::Postfork() {
state_->queue.Reset();
for (int i = 0; i < reserve_threads_; i++) {
StartThread(state_);
}
}
} // namespace experimental

@ -21,15 +21,14 @@
#include <grpc/support/port_platform.h>
#include <memory>
#include <queue>
#include <vector>
#include "absl/base/thread_annotations.h"
#include "absl/functional/any_invocable.h"
#include "src/core/lib/event_engine/forkable.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/gprpp/thd.h"
namespace grpc_event_engine {
namespace experimental {
@ -47,32 +46,57 @@ class ThreadPool final : public grpc_event_engine::experimental::Forkable {
void PostforkChild() override;
private:
class Thread {
class Queue {
public:
explicit Thread(ThreadPool* pool);
~Thread();
explicit Queue(int reserve_threads) : reserve_threads_(reserve_threads) {}
bool Step();
void SetShutdown() { SetState(State::kShutdown); }
void SetForking() { SetState(State::kForking); }
// Add a callback to the queue.
// Return true if we should also spin up a new thread.
bool Add(absl::AnyInvocable<void()> callback);
void Reset() { SetState(State::kRunning); }
private:
ThreadPool* pool_;
grpc_core::Thread thd_;
void ThreadFunc();
enum class State { kRunning, kShutdown, kForking };
void SetState(State state);
grpc_core::Mutex mu_;
grpc_core::CondVar cv_;
std::queue<absl::AnyInvocable<void()>> callbacks_ ABSL_GUARDED_BY(mu_);
int threads_waiting_ ABSL_GUARDED_BY(mu_) = 0;
const int reserve_threads_;
State state_ ABSL_GUARDED_BY(mu_) = State::kRunning;
};
void ThreadFunc();
void StartNThreadsLocked(int n) ABSL_EXCLUSIVE_LOCKS_REQUIRED(&mu_);
static void ReapThreads(std::vector<Thread*>* tlist);
grpc_core::Mutex mu_;
grpc_core::CondVar cv_;
grpc_core::CondVar shutdown_cv_;
grpc_core::CondVar fork_cv_;
bool shutdown_;
std::queue<absl::AnyInvocable<void()>> callbacks_;
int reserve_threads_;
int nthreads_;
int threads_waiting_;
std::vector<Thread*> dead_threads_;
bool forking_;
class ThreadCount {
public:
void Add();
void Remove();
// Block until all threads have stopped.
void Quiesce();
private:
grpc_core::Mutex mu_;
grpc_core::CondVar cv_;
int threads_ ABSL_GUARDED_BY(mu_) = 0;
};
struct State {
explicit State(int reserve_threads) : queue(reserve_threads) {}
Queue queue;
ThreadCount thread_count;
};
using StatePtr = std::shared_ptr<State>;
static void ThreadFunc(StatePtr state);
static void StartThread(StatePtr state);
void Postfork();
const int reserve_threads_;
const StatePtr state_ = std::make_shared<State>(reserve_threads_);
};
} // namespace experimental

@ -48,6 +48,19 @@ grpc_cc_test(
],
)
grpc_cc_test(
name = "thread_pool_test",
srcs = ["thread_pool_test.cc"],
external_deps = [
"absl/synchronization",
"gtest",
],
deps = [
"//:event_engine_thread_pool",
"//:gpr",
],
)
grpc_cc_test(
name = "endpoint_config_test",
srcs = ["endpoint_config_test.cc"],

@ -0,0 +1,98 @@
// Copyright 2022 The gRPC Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "src/core/lib/event_engine/thread_pool.h"
#include <atomic>
#include <chrono>
#include <thread>
#include <gtest/gtest.h>
#include "absl/synchronization/notification.h"
#include "gtest/gtest.h"
#include <grpc/support/log.h>
namespace grpc_event_engine {
namespace experimental {
TEST(ThreadPoolTest, CanRunClosure) {
ThreadPool p(1);
absl::Notification n;
p.Add([&n] { n.Notify(); });
n.WaitForNotification();
}
TEST(ThreadPoolTest, CanDestroyInsideClosure) {
auto p = std::make_shared<ThreadPool>(1);
p->Add([p]() { std::this_thread::sleep_for(std::chrono::seconds(1)); });
}
TEST(ThreadPoolTest, CanSurviveFork) {
ThreadPool p(1);
absl::Notification n;
gpr_log(GPR_INFO, "add callback 1");
p.Add([&n, &p] {
std::this_thread::sleep_for(std::chrono::seconds(1));
gpr_log(GPR_INFO, "add callback 2");
p.Add([&n] {
std::this_thread::sleep_for(std::chrono::seconds(1));
gpr_log(GPR_INFO, "notify");
n.Notify();
});
});
gpr_log(GPR_INFO, "prepare fork");
p.PrepareFork();
gpr_log(GPR_INFO, "wait for notification");
n.WaitForNotification();
gpr_log(GPR_INFO, "postfork child");
p.PostforkChild();
absl::Notification n2;
gpr_log(GPR_INFO, "add callback 3");
p.Add([&n2] {
gpr_log(GPR_INFO, "notify");
n2.Notify();
});
gpr_log(GPR_INFO, "wait for notification");
n2.WaitForNotification();
}
void ScheduleSelf(ThreadPool* p) {
p->Add([p] { ScheduleSelf(p); });
}
TEST(ThreadPoolDeathTest, CanDetectStucknessAtFork) {
ASSERT_DEATH_IF_SUPPORTED(
[] {
gpr_set_log_verbosity(GPR_LOG_SEVERITY_ERROR);
ThreadPool p(1);
ScheduleSelf(&p);
std::thread terminator([] {
std::this_thread::sleep_for(std::chrono::seconds(10));
abort();
});
p.PrepareFork();
}(),
"Waiting for thread pool to idle before forking");
}
} // namespace experimental
} // namespace grpc_event_engine
int main(int argc, char** argv) {
gpr_log_verbosity_init();
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}

@ -7205,6 +7205,30 @@
],
"uses_polling": true
},
{
"args": [],
"benchmark": false,
"ci_platforms": [
"linux",
"mac",
"posix",
"windows"
],
"cpu_cost": 1.0,
"exclude_configs": [],
"exclude_iomgrs": [],
"flaky": false,
"gtest": true,
"language": "c++",
"name": "thread_pool_test",
"platforms": [
"linux",
"mac",
"posix",
"windows"
],
"uses_polling": true
},
{
"args": [],
"benchmark": false,

Loading…
Cancel
Save