Add a quiesce to threadpool (#31380)

* Add a quiesce to threadpool

* fix

* Automated change: Fix sanity tests

* fix

Co-authored-by: ctiller <ctiller@users.noreply.github.com>
pull/31394/head
Craig Tiller 2 years ago committed by GitHub
parent 9285209a13
commit 0a8948ca2a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      src/core/lib/event_engine/posix_engine/posix_engine.cc
  2. 8
      src/core/lib/event_engine/thread_pool.cc
  3. 6
      src/core/lib/event_engine/thread_pool.h
  4. 1
      src/core/lib/event_engine/windows/windows_engine.cc
  5. 4
      test/core/event_engine/thread_pool_test.cc
  6. 6
      test/core/event_engine/windows/iocp_test.cc
  7. 2
      test/core/event_engine/windows/win_socket_test.cc

@ -61,6 +61,7 @@ PosixEventEngine::~PosixEventEngine() {
}
}
GPR_ASSERT(GPR_LIKELY(known_handles_.empty()));
executor_.Quiesce();
}
bool PosixEventEngine::Cancel(EventEngine::TaskHandle handle) {

@ -147,7 +147,7 @@ ThreadPool::ThreadPool() {
}
}
ThreadPool::~ThreadPool() {
void ThreadPool::Quiesce() {
state_->queue.SetShutdown();
// Wait until all threads are exited.
// Note that if this is a threadpool thread then we won't exit this thread
@ -155,9 +155,15 @@ ThreadPool::~ThreadPool() {
// thread running instead of zero.
state_->thread_count.BlockUntilThreadCount(g_threadpool_thread ? 1 : 0,
"shutting down");
quiesced_.store(true, std::memory_order_relaxed);
}
ThreadPool::~ThreadPool() {
GPR_ASSERT(quiesced_.load(std::memory_order_relaxed));
}
void ThreadPool::Run(absl::AnyInvocable<void()> callback) {
GPR_DEBUG_ASSERT(quiesced_.load(std::memory_order_relaxed) == false);
if (state_->queue.Add(std::move(callback))) {
StartThread(state_, StartThreadReason::kNoWaitersWhenScheduling);
}

@ -44,9 +44,12 @@ namespace experimental {
class ThreadPool final : public Forkable, public Executor {
public:
ThreadPool();
// Ensures the thread pool is empty before destroying it.
// Asserts Quiesce was called.
~ThreadPool() override;
void Quiesce();
// Run must not be called after Quiesce completes
void Run(absl::AnyInvocable<void()> callback) override;
void Run(EventEngine::Closure* closure) override;
@ -125,6 +128,7 @@ class ThreadPool final : public Forkable, public Executor {
const unsigned reserve_threads_ =
grpc_core::Clamp(gpr_cpu_num_cores(), 2u, 32u);
const StatePtr state_ = std::make_shared<State>(reserve_threads_);
std::atomic<bool> quiesced_{false};
};
} // namespace experimental

@ -77,6 +77,7 @@ WindowsEventEngine::~WindowsEventEngine() {
}
GPR_ASSERT(GPR_LIKELY(known_handles_.empty()));
GPR_ASSERT(WSACleanup() == 0);
executor_.Quiesce();
}
bool WindowsEventEngine::Cancel(EventEngine::TaskHandle handle) {

@ -33,6 +33,7 @@ TEST(ThreadPoolTest, CanRunClosure) {
grpc_core::Notification n;
p.Run([&n] { n.Notify(); });
n.WaitForNotification();
p.Quiesce();
}
TEST(ThreadPoolTest, CanDestroyInsideClosure) {
@ -41,6 +42,7 @@ TEST(ThreadPoolTest, CanDestroyInsideClosure) {
p->Run([p, &n]() mutable {
std::this_thread::sleep_for(std::chrono::seconds(1));
// This should delete the thread pool and not deadlock
p->Quiesce();
p.reset();
n.Notify();
});
@ -76,6 +78,7 @@ TEST(ThreadPoolTest, CanSurviveFork) {
});
gpr_log(GPR_INFO, "wait for notification");
n2.WaitForNotification();
p.Quiesce();
}
void ScheduleSelf(ThreadPool* p) {
@ -110,6 +113,7 @@ TEST(ThreadPoolTest, CanStartLotsOfClosures) {
// Our first thread pool implementation tried to create ~1M threads for this
// test.
ScheduleTwiceUntilZero(&p, 20);
p.Quiesce();
}
} // namespace experimental

@ -134,6 +134,7 @@ TEST_F(IOCPTest, ClientReceivesNotificationOfServerSend) {
wrapped_server_socket->MaybeShutdown(absl::OkStatus());
delete wrapped_client_socket;
delete wrapped_server_socket;
executor.Quiesce();
}
TEST_F(IOCPTest, IocpWorkTimeoutDueToNoNotificationRegistered) {
@ -198,6 +199,7 @@ TEST_F(IOCPTest, IocpWorkTimeoutDueToNoNotificationRegistered) {
delete on_read;
wrapped_client_socket->MaybeShutdown(absl::OkStatus());
delete wrapped_client_socket;
executor.Quiesce();
}
TEST_F(IOCPTest, KickWorks) {
@ -219,6 +221,7 @@ TEST_F(IOCPTest, KickWorks) {
});
// wait for the callbacks to run
kicked.WaitForNotification();
executor.Quiesce();
}
TEST_F(IOCPTest, KickThenShutdownCasusesNextWorkerToBeKicked) {
@ -245,6 +248,7 @@ TEST_F(IOCPTest, KickThenShutdownCasusesNextWorkerToBeKicked) {
[&cb_invoked]() { cb_invoked = true; });
ASSERT_TRUE(result == Poller::WorkResult::kDeadlineExceeded);
ASSERT_FALSE(cb_invoked);
executor.Quiesce();
}
TEST_F(IOCPTest, CrashOnWatchingAClosedSocket) {
@ -259,6 +263,7 @@ TEST_F(IOCPTest, CrashOnWatchingAClosedSocket) {
static_cast<WinSocket*>(iocp.Watch(sockpair[0]));
},
"");
executor.Quiesce();
}
TEST_F(IOCPTest, StressTestThousandsOfSockets) {
@ -334,6 +339,7 @@ TEST_F(IOCPTest, StressTestThousandsOfSockets) {
}
}
iocp_worker.join();
executor.Quiesce();
});
}
for (auto& t : threads) {

@ -63,6 +63,7 @@ TEST_F(WinSocketTest, ManualReadEventTriggeredWithoutIO) {
ASSERT_TRUE(read_called);
wrapped_client_socket.MaybeShutdown(absl::CancelledError("done"));
wrapped_server_socket.MaybeShutdown(absl::CancelledError("done"));
executor.Quiesce();
}
TEST_F(WinSocketTest, NotificationCalledImmediatelyOnShutdownWinSocket) {
@ -87,6 +88,7 @@ TEST_F(WinSocketTest, NotificationCalledImmediatelyOnShutdownWinSocket) {
}
ASSERT_TRUE(read_called);
closesocket(sockpair[1]);
executor.Quiesce();
}
int main(int argc, char** argv) {

Loading…
Cancel
Save