From 0a8948ca2af2473d3478b97a606298d35c33e5d0 Mon Sep 17 00:00:00 2001
From: Craig Tiller <ctiller@google.com>
Date: Tue, 18 Oct 2022 13:31:54 -0700
Subject: [PATCH] Add a quiesce to threadpool (#31380)

* Add a quiesce to threadpool

* fix

* Automated change: Fix sanity tests

* fix

Co-authored-by: ctiller <ctiller@users.noreply.github.com>
---
 src/core/lib/event_engine/posix_engine/posix_engine.cc | 1 +
 src/core/lib/event_engine/thread_pool.cc               | 8 +++++++-
 src/core/lib/event_engine/thread_pool.h                | 6 +++++-
 src/core/lib/event_engine/windows/windows_engine.cc    | 1 +
 test/core/event_engine/thread_pool_test.cc             | 4 ++++
 test/core/event_engine/windows/iocp_test.cc            | 6 ++++++
 test/core/event_engine/windows/win_socket_test.cc      | 2 ++
 7 files changed, 26 insertions(+), 2 deletions(-)

diff --git a/src/core/lib/event_engine/posix_engine/posix_engine.cc b/src/core/lib/event_engine/posix_engine/posix_engine.cc
index eb5508cdae2..cded900a0fa 100644
--- a/src/core/lib/event_engine/posix_engine/posix_engine.cc
+++ b/src/core/lib/event_engine/posix_engine/posix_engine.cc
@@ -61,6 +61,7 @@ PosixEventEngine::~PosixEventEngine() {
     }
   }
   GPR_ASSERT(GPR_LIKELY(known_handles_.empty()));
+  executor_.Quiesce();
 }
 
 bool PosixEventEngine::Cancel(EventEngine::TaskHandle handle) {
diff --git a/src/core/lib/event_engine/thread_pool.cc b/src/core/lib/event_engine/thread_pool.cc
index 774122bd005..fc77f58eea7 100644
--- a/src/core/lib/event_engine/thread_pool.cc
+++ b/src/core/lib/event_engine/thread_pool.cc
@@ -147,7 +147,7 @@ ThreadPool::ThreadPool() {
   }
 }
 
-ThreadPool::~ThreadPool() {
+void ThreadPool::Quiesce() {
   state_->queue.SetShutdown();
   // Wait until all threads are exited.
   // Note that if this is a threadpool thread then we won't exit this thread
@@ -155,9 +155,15 @@ ThreadPool::~ThreadPool() {
   // thread running instead of zero.
   state_->thread_count.BlockUntilThreadCount(g_threadpool_thread ? 1 : 0,
                                              "shutting down");
+  quiesced_.store(true, std::memory_order_relaxed);
+}
+
+ThreadPool::~ThreadPool() {
+  GPR_ASSERT(quiesced_.load(std::memory_order_relaxed));
 }
 
 void ThreadPool::Run(absl::AnyInvocable<void()> callback) {
+  GPR_DEBUG_ASSERT(quiesced_.load(std::memory_order_relaxed) == false);
   if (state_->queue.Add(std::move(callback))) {
     StartThread(state_, StartThreadReason::kNoWaitersWhenScheduling);
   }
diff --git a/src/core/lib/event_engine/thread_pool.h b/src/core/lib/event_engine/thread_pool.h
index 4b93b5a443a..a67714e78d4 100644
--- a/src/core/lib/event_engine/thread_pool.h
+++ b/src/core/lib/event_engine/thread_pool.h
@@ -44,9 +44,12 @@ namespace experimental {
 class ThreadPool final : public Forkable, public Executor {
  public:
   ThreadPool();
-  // Ensures the thread pool is empty before destroying it.
+  // Asserts Quiesce was called.
   ~ThreadPool() override;
 
+  void Quiesce();
+
+  // Run must not be called after Quiesce completes
   void Run(absl::AnyInvocable<void()> callback) override;
   void Run(EventEngine::Closure* closure) override;
 
@@ -125,6 +128,7 @@ class ThreadPool final : public Forkable, public Executor {
   const unsigned reserve_threads_ =
       grpc_core::Clamp(gpr_cpu_num_cores(), 2u, 32u);
   const StatePtr state_ = std::make_shared<State>(reserve_threads_);
+  std::atomic<bool> quiesced_{false};
 };
 
 }  // namespace experimental
diff --git a/src/core/lib/event_engine/windows/windows_engine.cc b/src/core/lib/event_engine/windows/windows_engine.cc
index 22500290371..7ec77267774 100644
--- a/src/core/lib/event_engine/windows/windows_engine.cc
+++ b/src/core/lib/event_engine/windows/windows_engine.cc
@@ -77,6 +77,7 @@ WindowsEventEngine::~WindowsEventEngine() {
   }
   GPR_ASSERT(GPR_LIKELY(known_handles_.empty()));
   GPR_ASSERT(WSACleanup() == 0);
+  executor_.Quiesce();
 }
 
 bool WindowsEventEngine::Cancel(EventEngine::TaskHandle handle) {
diff --git a/test/core/event_engine/thread_pool_test.cc b/test/core/event_engine/thread_pool_test.cc
index 605600ba375..e3057d308ab 100644
--- a/test/core/event_engine/thread_pool_test.cc
+++ b/test/core/event_engine/thread_pool_test.cc
@@ -33,6 +33,7 @@ TEST(ThreadPoolTest, CanRunClosure) {
   grpc_core::Notification n;
   p.Run([&n] { n.Notify(); });
   n.WaitForNotification();
+  p.Quiesce();
 }
 
 TEST(ThreadPoolTest, CanDestroyInsideClosure) {
@@ -41,6 +42,7 @@ TEST(ThreadPoolTest, CanDestroyInsideClosure) {
   p->Run([p, &n]() mutable {
     std::this_thread::sleep_for(std::chrono::seconds(1));
     // This should delete the thread pool and not deadlock
+    p->Quiesce();
     p.reset();
     n.Notify();
   });
@@ -76,6 +78,7 @@ TEST(ThreadPoolTest, CanSurviveFork) {
   });
   gpr_log(GPR_INFO, "wait for notification");
   n2.WaitForNotification();
+  p.Quiesce();
 }
 
 void ScheduleSelf(ThreadPool* p) {
@@ -110,6 +113,7 @@ TEST(ThreadPoolTest, CanStartLotsOfClosures) {
   // Our first thread pool implementation tried to create ~1M threads for this
   // test.
   ScheduleTwiceUntilZero(&p, 20);
+  p.Quiesce();
 }
 
 }  // namespace experimental
diff --git a/test/core/event_engine/windows/iocp_test.cc b/test/core/event_engine/windows/iocp_test.cc
index 1736446d3d4..d1e22988434 100644
--- a/test/core/event_engine/windows/iocp_test.cc
+++ b/test/core/event_engine/windows/iocp_test.cc
@@ -134,6 +134,7 @@ TEST_F(IOCPTest, ClientReceivesNotificationOfServerSend) {
   wrapped_server_socket->MaybeShutdown(absl::OkStatus());
   delete wrapped_client_socket;
   delete wrapped_server_socket;
+  executor.Quiesce();
 }
 
 TEST_F(IOCPTest, IocpWorkTimeoutDueToNoNotificationRegistered) {
@@ -198,6 +199,7 @@ TEST_F(IOCPTest, IocpWorkTimeoutDueToNoNotificationRegistered) {
   delete on_read;
   wrapped_client_socket->MaybeShutdown(absl::OkStatus());
   delete wrapped_client_socket;
+  executor.Quiesce();
 }
 
 TEST_F(IOCPTest, KickWorks) {
@@ -219,6 +221,7 @@ TEST_F(IOCPTest, KickWorks) {
   });
   // wait for the callbacks to run
   kicked.WaitForNotification();
+  executor.Quiesce();
 }
 
 TEST_F(IOCPTest, KickThenShutdownCasusesNextWorkerToBeKicked) {
@@ -245,6 +248,7 @@ TEST_F(IOCPTest, KickThenShutdownCasusesNextWorkerToBeKicked) {
                      [&cb_invoked]() { cb_invoked = true; });
   ASSERT_TRUE(result == Poller::WorkResult::kDeadlineExceeded);
   ASSERT_FALSE(cb_invoked);
+  executor.Quiesce();
 }
 
 TEST_F(IOCPTest, CrashOnWatchingAClosedSocket) {
@@ -259,6 +263,7 @@ TEST_F(IOCPTest, CrashOnWatchingAClosedSocket) {
             static_cast<WinSocket*>(iocp.Watch(sockpair[0]));
       },
       "");
+  executor.Quiesce();
 }
 
 TEST_F(IOCPTest, StressTestThousandsOfSockets) {
@@ -334,6 +339,7 @@ TEST_F(IOCPTest, StressTestThousandsOfSockets) {
         }
       }
       iocp_worker.join();
+      executor.Quiesce();
     });
   }
   for (auto& t : threads) {
diff --git a/test/core/event_engine/windows/win_socket_test.cc b/test/core/event_engine/windows/win_socket_test.cc
index 5b73f5ce14c..e9c474bbeb4 100644
--- a/test/core/event_engine/windows/win_socket_test.cc
+++ b/test/core/event_engine/windows/win_socket_test.cc
@@ -63,6 +63,7 @@ TEST_F(WinSocketTest, ManualReadEventTriggeredWithoutIO) {
   ASSERT_TRUE(read_called);
   wrapped_client_socket.MaybeShutdown(absl::CancelledError("done"));
   wrapped_server_socket.MaybeShutdown(absl::CancelledError("done"));
+  executor.Quiesce();
 }
 
 TEST_F(WinSocketTest, NotificationCalledImmediatelyOnShutdownWinSocket) {
@@ -87,6 +88,7 @@ TEST_F(WinSocketTest, NotificationCalledImmediatelyOnShutdownWinSocket) {
   }
   ASSERT_TRUE(read_called);
   closesocket(sockpair[1]);
+  executor.Quiesce();
 }
 
 int main(int argc, char** argv) {