diff --git a/BUILD b/BUILD
index 3c9dec3b245..96342999d8a 100644
--- a/BUILD
+++ b/BUILD
@@ -2466,6 +2466,7 @@ grpc_cc_library(
     external_deps = [
         "absl/base:core_headers",
         "absl/functional:any_invocable",
+        "absl/time",
     ],
     deps = [
         "forkable",
diff --git a/CMakeLists.txt b/CMakeLists.txt
index a478579336b..ccda5fbbef7 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -1200,6 +1200,7 @@ if(gRPC_BUILD_TESTS)
   add_dependencies(buildtests_cxx test_cpp_util_time_test)
   add_dependencies(buildtests_cxx thd_test)
   add_dependencies(buildtests_cxx thread_manager_test)
+  add_dependencies(buildtests_cxx thread_pool_test)
   add_dependencies(buildtests_cxx thread_quota_test)
   if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX)
     add_dependencies(buildtests_cxx thread_stress_test)
@@ -18298,6 +18299,45 @@ target_link_libraries(thread_manager_test
 )
 
 
+endif()
+if(gRPC_BUILD_TESTS)
+
+add_executable(thread_pool_test
+  src/core/lib/event_engine/forkable.cc
+  src/core/lib/event_engine/thread_pool.cc
+  test/core/event_engine/thread_pool_test.cc
+  third_party/googletest/googletest/src/gtest-all.cc
+  third_party/googletest/googlemock/src/gmock-all.cc
+)
+
+target_include_directories(thread_pool_test
+  PRIVATE
+    ${CMAKE_CURRENT_SOURCE_DIR}
+    ${CMAKE_CURRENT_SOURCE_DIR}/include
+    ${_gRPC_ADDRESS_SORTING_INCLUDE_DIR}
+    ${_gRPC_RE2_INCLUDE_DIR}
+    ${_gRPC_SSL_INCLUDE_DIR}
+    ${_gRPC_UPB_GENERATED_DIR}
+    ${_gRPC_UPB_GRPC_GENERATED_DIR}
+    ${_gRPC_UPB_INCLUDE_DIR}
+    ${_gRPC_XXHASH_INCLUDE_DIR}
+    ${_gRPC_ZLIB_INCLUDE_DIR}
+    third_party/googletest/googletest/include
+    third_party/googletest/googletest
+    third_party/googletest/googlemock/include
+    third_party/googletest/googlemock
+    ${_gRPC_PROTO_GENS_DIR}
+)
+
+target_link_libraries(thread_pool_test
+  ${_gRPC_PROTOBUF_LIBRARIES}
+  ${_gRPC_ALLTARGETS_LIBRARIES}
+  absl::flat_hash_set
+  absl::any_invocable
+  gpr
+)
+
+
 endif()
 if(gRPC_BUILD_TESTS)
 
diff --git a/build_autogenerated.yaml b/build_autogenerated.yaml
index 389e017985f..5d6ef262ef1 100644
--- a/build_autogenerated.yaml
+++ b/build_autogenerated.yaml
@@ -9985,6 +9985,21 @@ targets:
   deps:
   - grpc++_test_config
   - grpc++_test_util
+- name: thread_pool_test
+  gtest: true
+  build: test
+  language: c++
+  headers:
+  - src/core/lib/event_engine/forkable.h
+  - src/core/lib/event_engine/thread_pool.h
+  src:
+  - src/core/lib/event_engine/forkable.cc
+  - src/core/lib/event_engine/thread_pool.cc
+  - test/core/event_engine/thread_pool_test.cc
+  deps:
+  - absl/container:flat_hash_set
+  - absl/functional:any_invocable
+  - gpr
 - name: thread_quota_test
   gtest: true
   build: test
diff --git a/src/core/lib/event_engine/thread_pool.cc b/src/core/lib/event_engine/thread_pool.cc
index ddc6c713597..93521ccd660 100644
--- a/src/core/lib/event_engine/thread_pool.cc
+++ b/src/core/lib/event_engine/thread_pool.cc
@@ -20,138 +20,148 @@
 
 #include "src/core/lib/event_engine/thread_pool.h"
 
+#include <memory>
 #include <utility>
 
+#include "absl/time/clock.h"
+#include "absl/time/time.h"
+
+#include <grpc/support/log.h>
+
 #include "src/core/lib/gprpp/thd.h"
 
 namespace grpc_event_engine {
 namespace experimental {
 
-ThreadPool::Thread::Thread(ThreadPool* pool)
-    : pool_(pool),
-      thd_(
-          "posix_eventengine_pool",
-          [](void* th) { static_cast<ThreadPool::Thread*>(th)->ThreadFunc(); },
-          this, nullptr, grpc_core::Thread::Options().set_tracked(false)) {
-  thd_.Start();
+void ThreadPool::StartThread(StatePtr state) {
+  state->thread_count.Add();
+  grpc_core::Thread(
+      "event_engine",
+      [](void* arg) {
+        ThreadFunc(*std::unique_ptr<StatePtr>(static_cast<StatePtr*>(arg)));
+      },
+      new StatePtr(state), nullptr,
+      grpc_core::Thread::Options().set_tracked(false).set_joinable(false))
+      .Start();
 }
-ThreadPool::Thread::~Thread() { thd_.Join(); }
-
-void ThreadPool::Thread::ThreadFunc() {
-  pool_->ThreadFunc();
-  // Now that we have killed ourselves, we should reduce the thread count
-  grpc_core::MutexLock lock(&pool_->mu_);
-  pool_->nthreads_--;
-  // Move ourselves to dead list
-  pool_->dead_threads_.push_back(this);
-
-  if (pool_->nthreads_ == 0) {
-    if (pool_->forking_) pool_->fork_cv_.Signal();
-    if (pool_->shutdown_) pool_->shutdown_cv_.Signal();
+
+void ThreadPool::ThreadFunc(StatePtr state) {
+  while (state->queue.Step()) {
   }
+  state->thread_count.Remove();
 }
 
-void ThreadPool::ThreadFunc() {
-  for (;;) {
-    // Wait until work is available or we are shutting down.
-    grpc_core::ReleasableMutexLock lock(&mu_);
-    if (!forking_ && !shutdown_ && callbacks_.empty()) {
-      // If there are too many threads waiting, then quit this thread
-      if (threads_waiting_ >= reserve_threads_) {
-        break;
-      }
-      threads_waiting_++;
-      cv_.Wait(&mu_);
-      threads_waiting_--;
-    }
-    // a fork could be initiated while the thread was waiting
-    if (forking_) return;
-    // Drain callbacks before considering shutdown to ensure all work
-    // gets completed.
-    if (!callbacks_.empty()) {
-      auto cb = std::move(callbacks_.front());
-      callbacks_.pop();
-      lock.Release();
-      cb();
-    } else if (shutdown_) {
+bool ThreadPool::Queue::Step() {
+  grpc_core::ReleasableMutexLock lock(&mu_);
+  // Wait until work is available or we are shutting down.
+  while (state_ == State::kRunning && callbacks_.empty()) {
+    // If there are too many threads waiting, then quit this thread.
+    // TODO(ctiller): wait some time in this case to be sure.
+    if (threads_waiting_ >= reserve_threads_) return false;
+    threads_waiting_++;
+    cv_.Wait(&mu_);
+    threads_waiting_--;
+  }
+  switch (state_) {
+    case State::kRunning:
       break;
-    }
+    case State::kShutdown:
+    case State::kForking:
+      if (!callbacks_.empty()) break;
+      return false;
   }
+  GPR_ASSERT(!callbacks_.empty());
+  auto callback = std::move(callbacks_.front());
+  callbacks_.pop();
+  lock.Release();
+  callback();
+  return true;
 }
 
 ThreadPool::ThreadPool(int reserve_threads)
-    : shutdown_(false),
-      reserve_threads_(reserve_threads),
-      nthreads_(0),
-      threads_waiting_(0),
-      forking_(false) {
-  grpc_core::MutexLock lock(&mu_);
-  StartNThreadsLocked(reserve_threads_);
+    : reserve_threads_(reserve_threads) {
+  for (int i = 0; i < reserve_threads; i++) {
+    StartThread(state_);
+  }
 }
 
-void ThreadPool::StartNThreadsLocked(int n) {
-  for (int i = 0; i < n; i++) {
-    nthreads_++;
-    new Thread(this);
+ThreadPool::~ThreadPool() { state_->queue.SetShutdown(); }
+
+void ThreadPool::Add(absl::AnyInvocable<void()> callback) {
+  if (state_->queue.Add(std::move(callback))) {
+    StartThread(state_);
   }
 }
 
-void ThreadPool::ReapThreads(std::vector<Thread*>* tlist) {
-  for (auto* t : *tlist) delete t;
-  tlist->clear();
+bool ThreadPool::Queue::Add(absl::AnyInvocable<void()> callback) {
+  grpc_core::MutexLock lock(&mu_);
+  // Add works to the callbacks list
+  callbacks_.push(std::move(callback));
+  cv_.Signal();
+  switch (state_) {
+    case State::kRunning:
+    case State::kShutdown:
+      return threads_waiting_ == 0;
+    case State::kForking:
+      return false;
+  }
+  GPR_UNREACHABLE_CODE(return false);
 }
 
-ThreadPool::~ThreadPool() {
+void ThreadPool::Queue::SetState(State state) {
   grpc_core::MutexLock lock(&mu_);
-  shutdown_ = true;
-  cv_.SignalAll();
-  while (nthreads_ != 0) {
-    shutdown_cv_.Wait(&mu_);
+  if (state == State::kRunning) {
+    GPR_ASSERT(state_ != State::kRunning);
+  } else {
+    GPR_ASSERT(state_ == State::kRunning);
   }
-  ReapThreads(&dead_threads_);
+  state_ = state;
+  cv_.SignalAll();
 }
 
-void ThreadPool::Add(absl::AnyInvocable<void()> callback) {
+void ThreadPool::ThreadCount::Add() {
   grpc_core::MutexLock lock(&mu_);
-  // Add works to the callbacks list
-  callbacks_.push(std::move(callback));
-  // Store the callback for later if we are forking.
-  // TODO(hork): should we block instead?
-  if (forking_) return;
-  // Increase pool size or notify as needed
-  if (threads_waiting_ == 0) {
-    // Kick off a new thread
-    nthreads_++;
-    new Thread(this);
-  } else {
+  ++threads_;
+}
+
+void ThreadPool::ThreadCount::Remove() {
+  grpc_core::MutexLock lock(&mu_);
+  --threads_;
+  if (threads_ == 0) {
     cv_.Signal();
   }
-  // Also use this chance to harvest dead threads
-  if (!dead_threads_.empty()) {
-    ReapThreads(&dead_threads_);
-  }
 }
 
-void ThreadPool::PrepareFork() {
+void ThreadPool::ThreadCount::Quiesce() {
   grpc_core::MutexLock lock(&mu_);
-  forking_ = true;
-  cv_.SignalAll();
-  while (nthreads_ != 0) {
-    fork_cv_.Wait(&mu_);
+  auto last_log = absl::Now();
+  while (threads_ > 0) {
+    // Wait for all threads to exit.
+    // At least once every three seconds (but no faster than once per second in
+    // the event of spurious wakeups) log a message indicating we're waiting to
+    // fork.
+    cv_.WaitWithTimeout(&mu_, absl::Seconds(3));
+    if (threads_ > 0 && absl::Now() - last_log > absl::Seconds(1)) {
+      gpr_log(GPR_ERROR, "Waiting for thread pool to idle before forking");
+      last_log = absl::Now();
+    }
   }
-  ReapThreads(&dead_threads_);
 }
 
-void ThreadPool::PostforkParent() {
-  grpc_core::MutexLock lock(&mu_);
-  forking_ = false;
-  StartNThreadsLocked(reserve_threads_);
+void ThreadPool::PrepareFork() {
+  state_->queue.SetForking();
+  state_->thread_count.Quiesce();
 }
 
-void ThreadPool::PostforkChild() {
-  grpc_core::MutexLock lock(&mu_);
-  forking_ = false;
-  StartNThreadsLocked(reserve_threads_);
+void ThreadPool::PostforkParent() { Postfork(); }
+
+void ThreadPool::PostforkChild() { Postfork(); }
+
+void ThreadPool::Postfork() {
+  state_->queue.Reset();
+  for (int i = 0; i < reserve_threads_; i++) {
+    StartThread(state_);
+  }
 }
 
 }  // namespace experimental
diff --git a/src/core/lib/event_engine/thread_pool.h b/src/core/lib/event_engine/thread_pool.h
index 6613279cc5c..dad1f282922 100644
--- a/src/core/lib/event_engine/thread_pool.h
+++ b/src/core/lib/event_engine/thread_pool.h
@@ -21,15 +21,14 @@
 
 #include <grpc/support/port_platform.h>
 
+#include <memory>
 #include <queue>
-#include <vector>
 
 #include "absl/base/thread_annotations.h"
 #include "absl/functional/any_invocable.h"
 
 #include "src/core/lib/event_engine/forkable.h"
 #include "src/core/lib/gprpp/sync.h"
-#include "src/core/lib/gprpp/thd.h"
 
 namespace grpc_event_engine {
 namespace experimental {
@@ -47,32 +46,57 @@ class ThreadPool final : public grpc_event_engine::experimental::Forkable {
   void PostforkChild() override;
 
  private:
-  class Thread {
+  class Queue {
    public:
-    explicit Thread(ThreadPool* pool);
-    ~Thread();
+    explicit Queue(int reserve_threads) : reserve_threads_(reserve_threads) {}
+    bool Step();
+    void SetShutdown() { SetState(State::kShutdown); }
+    void SetForking() { SetState(State::kForking); }
+    // Add a callback to the queue.
+    // Return true if we should also spin up a new thread.
+    bool Add(absl::AnyInvocable<void()> callback);
+    void Reset() { SetState(State::kRunning); }
 
    private:
-    ThreadPool* pool_;
-    grpc_core::Thread thd_;
-    void ThreadFunc();
+    enum class State { kRunning, kShutdown, kForking };
+
+    void SetState(State state);
+
+    grpc_core::Mutex mu_;
+    grpc_core::CondVar cv_;
+    std::queue<absl::AnyInvocable<void()>> callbacks_ ABSL_GUARDED_BY(mu_);
+    int threads_waiting_ ABSL_GUARDED_BY(mu_) = 0;
+    const int reserve_threads_;
+    State state_ ABSL_GUARDED_BY(mu_) = State::kRunning;
   };
 
-  void ThreadFunc();
-  void StartNThreadsLocked(int n) ABSL_EXCLUSIVE_LOCKS_REQUIRED(&mu_);
-  static void ReapThreads(std::vector<Thread*>* tlist);
-
-  grpc_core::Mutex mu_;
-  grpc_core::CondVar cv_;
-  grpc_core::CondVar shutdown_cv_;
-  grpc_core::CondVar fork_cv_;
-  bool shutdown_;
-  std::queue<absl::AnyInvocable<void()>> callbacks_;
-  int reserve_threads_;
-  int nthreads_;
-  int threads_waiting_;
-  std::vector<Thread*> dead_threads_;
-  bool forking_;
+  class ThreadCount {
+   public:
+    void Add();
+    void Remove();
+    // Block until all threads have stopped.
+    void Quiesce();
+
+   private:
+    grpc_core::Mutex mu_;
+    grpc_core::CondVar cv_;
+    int threads_ ABSL_GUARDED_BY(mu_) = 0;
+  };
+
+  struct State {
+    explicit State(int reserve_threads) : queue(reserve_threads) {}
+    Queue queue;
+    ThreadCount thread_count;
+  };
+
+  using StatePtr = std::shared_ptr<State>;
+
+  static void ThreadFunc(StatePtr state);
+  static void StartThread(StatePtr state);
+  void Postfork();
+
+  const int reserve_threads_;
+  const StatePtr state_ = std::make_shared<State>(reserve_threads_);
 };
 
 }  // namespace experimental
diff --git a/test/core/event_engine/BUILD b/test/core/event_engine/BUILD
index dea38abe6bc..53ac8b38218 100644
--- a/test/core/event_engine/BUILD
+++ b/test/core/event_engine/BUILD
@@ -48,6 +48,19 @@ grpc_cc_test(
     ],
 )
 
+grpc_cc_test(
+    name = "thread_pool_test",
+    srcs = ["thread_pool_test.cc"],
+    external_deps = [
+        "absl/synchronization",
+        "gtest",
+    ],
+    deps = [
+        "//:event_engine_thread_pool",
+        "//:gpr",
+    ],
+)
+
 grpc_cc_test(
     name = "endpoint_config_test",
     srcs = ["endpoint_config_test.cc"],
diff --git a/test/core/event_engine/thread_pool_test.cc b/test/core/event_engine/thread_pool_test.cc
new file mode 100644
index 00000000000..8af63bcc193
--- /dev/null
+++ b/test/core/event_engine/thread_pool_test.cc
@@ -0,0 +1,98 @@
+// Copyright 2022 The gRPC Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include "src/core/lib/event_engine/thread_pool.h"
+
+#include <atomic>
+#include <chrono>
+#include <thread>
+
+#include <gtest/gtest.h>
+
+#include "absl/synchronization/notification.h"
+#include "gtest/gtest.h"
+
+#include <grpc/support/log.h>
+
+namespace grpc_event_engine {
+namespace experimental {
+
+TEST(ThreadPoolTest, CanRunClosure) {
+  ThreadPool p(1);
+  absl::Notification n;
+  p.Add([&n] { n.Notify(); });
+  n.WaitForNotification();
+}
+
+TEST(ThreadPoolTest, CanDestroyInsideClosure) {
+  auto p = std::make_shared<ThreadPool>(1);
+  p->Add([p]() { std::this_thread::sleep_for(std::chrono::seconds(1)); });
+}
+
+TEST(ThreadPoolTest, CanSurviveFork) {
+  ThreadPool p(1);
+  absl::Notification n;
+  gpr_log(GPR_INFO, "add callback 1");
+  p.Add([&n, &p] {
+    std::this_thread::sleep_for(std::chrono::seconds(1));
+    gpr_log(GPR_INFO, "add callback 2");
+    p.Add([&n] {
+      std::this_thread::sleep_for(std::chrono::seconds(1));
+      gpr_log(GPR_INFO, "notify");
+      n.Notify();
+    });
+  });
+  gpr_log(GPR_INFO, "prepare fork");
+  p.PrepareFork();
+  gpr_log(GPR_INFO, "wait for notification");
+  n.WaitForNotification();
+  gpr_log(GPR_INFO, "postfork child");
+  p.PostforkChild();
+  absl::Notification n2;
+  gpr_log(GPR_INFO, "add callback 3");
+  p.Add([&n2] {
+    gpr_log(GPR_INFO, "notify");
+    n2.Notify();
+  });
+  gpr_log(GPR_INFO, "wait for notification");
+  n2.WaitForNotification();
+}
+
+void ScheduleSelf(ThreadPool* p) {
+  p->Add([p] { ScheduleSelf(p); });
+}
+
+TEST(ThreadPoolDeathTest, CanDetectStucknessAtFork) {
+  ASSERT_DEATH_IF_SUPPORTED(
+      [] {
+        gpr_set_log_verbosity(GPR_LOG_SEVERITY_ERROR);
+        ThreadPool p(1);
+        ScheduleSelf(&p);
+        std::thread terminator([] {
+          std::this_thread::sleep_for(std::chrono::seconds(10));
+          abort();
+        });
+        p.PrepareFork();
+      }(),
+      "Waiting for thread pool to idle before forking");
+}
+
+}  // namespace experimental
+}  // namespace grpc_event_engine
+
+int main(int argc, char** argv) {
+  gpr_log_verbosity_init();
+  ::testing::InitGoogleTest(&argc, argv);
+  return RUN_ALL_TESTS();
+}
diff --git a/tools/run_tests/generated/tests.json b/tools/run_tests/generated/tests.json
index 57f30f9ed27..0a0ac9971b8 100644
--- a/tools/run_tests/generated/tests.json
+++ b/tools/run_tests/generated/tests.json
@@ -7205,6 +7205,30 @@
     ],
     "uses_polling": true
   },
+  {
+    "args": [],
+    "benchmark": false,
+    "ci_platforms": [
+      "linux",
+      "mac",
+      "posix",
+      "windows"
+    ],
+    "cpu_cost": 1.0,
+    "exclude_configs": [],
+    "exclude_iomgrs": [],
+    "flaky": false,
+    "gtest": true,
+    "language": "c++",
+    "name": "thread_pool_test",
+    "platforms": [
+      "linux",
+      "mac",
+      "posix",
+      "windows"
+    ],
+    "uses_polling": true
+  },
   {
     "args": [],
     "benchmark": false,