From 108af0a94ff7519184f455328114af1a7190b9ec Mon Sep 17 00:00:00 2001 From: AJ Heller Date: Thu, 17 Aug 2023 17:00:02 -0700 Subject: [PATCH] [EventEngine] Improve lock contention in WorkStealingThreadPool (alternative) (#34065) Proposed alternative to https://github.com/grpc/grpc/pull/34024. This version has a simpler, faster busy-count implementation based on a sharded set of atomic counts: fast increment/decrement operations, relatively slower summation of total counts (which need to happen much less frequently). --- CMakeLists.txt | 4 + Makefile | 2 + Package.swift | 2 + build_autogenerated.yaml | 8 + config.m4 | 1 + config.w32 | 1 + gRPC-C++.podspec | 2 + gRPC-Core.podspec | 3 + grpc.gemspec | 2 + grpc.gyp | 3 + package.xml | 2 + src/core/BUILD | 18 ++ .../event_engine/thread_pool/thread_count.cc | 58 ++++++ .../event_engine/thread_pool/thread_count.h | 161 ++++++++++++++ .../thread_pool/work_stealing_thread_pool.cc | 108 ++-------- .../thread_pool/work_stealing_thread_pool.h | 65 +----- src/python/grpcio/grpc_core_dependencies.py | 1 + test/core/event_engine/BUILD | 1 + test/core/event_engine/thread_pool_test.cc | 196 +++++++++++++++++- tools/doxygen/Doxyfile.c++.internal | 2 + tools/doxygen/Doxyfile.core.internal | 2 + 21 files changed, 489 insertions(+), 153 deletions(-) create mode 100644 src/core/lib/event_engine/thread_pool/thread_count.cc create mode 100644 src/core/lib/event_engine/thread_pool/thread_count.h diff --git a/CMakeLists.txt b/CMakeLists.txt index 48de270fb2d..5ba2aee3f17 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -2179,6 +2179,7 @@ add_library(grpc src/core/lib/event_engine/slice_buffer.cc src/core/lib/event_engine/tcp_socket_utils.cc src/core/lib/event_engine/thread_pool/original_thread_pool.cc + src/core/lib/event_engine/thread_pool/thread_count.cc src/core/lib/event_engine/thread_pool/thread_pool_factory.cc src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.cc src/core/lib/event_engine/thready_event_engine/thready_event_engine.cc @@ -2883,6 +2884,7 @@ add_library(grpc_unsecure src/core/lib/event_engine/slice_buffer.cc src/core/lib/event_engine/tcp_socket_utils.cc src/core/lib/event_engine/thread_pool/original_thread_pool.cc + src/core/lib/event_engine/thread_pool/thread_count.cc src/core/lib/event_engine/thread_pool/thread_pool_factory.cc src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.cc src/core/lib/event_engine/thready_event_engine/thready_event_engine.cc @@ -4779,6 +4781,7 @@ add_library(grpc_authorization_provider src/core/lib/event_engine/slice_buffer.cc src/core/lib/event_engine/tcp_socket_utils.cc src/core/lib/event_engine/thread_pool/original_thread_pool.cc + src/core/lib/event_engine/thread_pool/thread_count.cc src/core/lib/event_engine/thread_pool/thread_pool_factory.cc src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.cc src/core/lib/event_engine/thready_event_engine/thready_event_engine.cc @@ -12071,6 +12074,7 @@ add_executable(frame_test src/core/lib/event_engine/slice_buffer.cc src/core/lib/event_engine/tcp_socket_utils.cc src/core/lib/event_engine/thread_pool/original_thread_pool.cc + src/core/lib/event_engine/thread_pool/thread_count.cc src/core/lib/event_engine/thread_pool/thread_pool_factory.cc src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.cc src/core/lib/event_engine/thready_event_engine/thready_event_engine.cc diff --git a/Makefile b/Makefile index 80a78f89e4e..c7f498c57de 100644 --- a/Makefile +++ b/Makefile @@ -1462,6 +1462,7 @@ LIBGRPC_SRC = \ src/core/lib/event_engine/slice_buffer.cc \ src/core/lib/event_engine/tcp_socket_utils.cc \ src/core/lib/event_engine/thread_pool/original_thread_pool.cc \ + src/core/lib/event_engine/thread_pool/thread_count.cc \ src/core/lib/event_engine/thread_pool/thread_pool_factory.cc \ src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.cc \ src/core/lib/event_engine/thready_event_engine/thready_event_engine.cc \ @@ -2027,6 +2028,7 @@ LIBGRPC_UNSECURE_SRC = \ src/core/lib/event_engine/slice_buffer.cc \ src/core/lib/event_engine/tcp_socket_utils.cc \ src/core/lib/event_engine/thread_pool/original_thread_pool.cc \ + src/core/lib/event_engine/thread_pool/thread_count.cc \ src/core/lib/event_engine/thread_pool/thread_pool_factory.cc \ src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.cc \ src/core/lib/event_engine/thready_event_engine/thready_event_engine.cc \ diff --git a/Package.swift b/Package.swift index 125f977e25f..17c4cedb05b 100644 --- a/Package.swift +++ b/Package.swift @@ -1137,6 +1137,8 @@ let package = Package( "src/core/lib/event_engine/thread_local.h", "src/core/lib/event_engine/thread_pool/original_thread_pool.cc", "src/core/lib/event_engine/thread_pool/original_thread_pool.h", + "src/core/lib/event_engine/thread_pool/thread_count.cc", + "src/core/lib/event_engine/thread_pool/thread_count.h", "src/core/lib/event_engine/thread_pool/thread_pool.h", "src/core/lib/event_engine/thread_pool/thread_pool_factory.cc", "src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.cc", diff --git a/build_autogenerated.yaml b/build_autogenerated.yaml index c3ba75c0a56..7e640d1fc07 100644 --- a/build_autogenerated.yaml +++ b/build_autogenerated.yaml @@ -723,6 +723,7 @@ libs: - src/core/lib/event_engine/shim.h - src/core/lib/event_engine/tcp_socket_utils.h - src/core/lib/event_engine/thread_pool/original_thread_pool.h + - src/core/lib/event_engine/thread_pool/thread_count.h - src/core/lib/event_engine/thread_pool/thread_pool.h - src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.h - src/core/lib/event_engine/thready_event_engine/thready_event_engine.h @@ -1532,6 +1533,7 @@ libs: - src/core/lib/event_engine/slice_buffer.cc - src/core/lib/event_engine/tcp_socket_utils.cc - src/core/lib/event_engine/thread_pool/original_thread_pool.cc + - src/core/lib/event_engine/thread_pool/thread_count.cc - src/core/lib/event_engine/thread_pool/thread_pool_factory.cc - src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.cc - src/core/lib/event_engine/thready_event_engine/thready_event_engine.cc @@ -2126,6 +2128,7 @@ libs: - src/core/lib/event_engine/shim.h - src/core/lib/event_engine/tcp_socket_utils.h - src/core/lib/event_engine/thread_pool/original_thread_pool.h + - src/core/lib/event_engine/thread_pool/thread_count.h - src/core/lib/event_engine/thread_pool/thread_pool.h - src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.h - src/core/lib/event_engine/thready_event_engine/thready_event_engine.h @@ -2542,6 +2545,7 @@ libs: - src/core/lib/event_engine/slice_buffer.cc - src/core/lib/event_engine/tcp_socket_utils.cc - src/core/lib/event_engine/thread_pool/original_thread_pool.cc + - src/core/lib/event_engine/thread_pool/thread_count.cc - src/core/lib/event_engine/thread_pool/thread_pool_factory.cc - src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.cc - src/core/lib/event_engine/thready_event_engine/thready_event_engine.cc @@ -4088,6 +4092,7 @@ libs: - src/core/lib/event_engine/shim.h - src/core/lib/event_engine/tcp_socket_utils.h - src/core/lib/event_engine/thread_pool/original_thread_pool.h + - src/core/lib/event_engine/thread_pool/thread_count.h - src/core/lib/event_engine/thread_pool/thread_pool.h - src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.h - src/core/lib/event_engine/thready_event_engine/thready_event_engine.h @@ -4384,6 +4389,7 @@ libs: - src/core/lib/event_engine/slice_buffer.cc - src/core/lib/event_engine/tcp_socket_utils.cc - src/core/lib/event_engine/thread_pool/original_thread_pool.cc + - src/core/lib/event_engine/thread_pool/thread_count.cc - src/core/lib/event_engine/thread_pool/thread_pool_factory.cc - src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.cc - src/core/lib/event_engine/thready_event_engine/thready_event_engine.cc @@ -8731,6 +8737,7 @@ targets: - src/core/lib/event_engine/shim.h - src/core/lib/event_engine/tcp_socket_utils.h - src/core/lib/event_engine/thread_pool/original_thread_pool.h + - src/core/lib/event_engine/thread_pool/thread_count.h - src/core/lib/event_engine/thread_pool/thread_pool.h - src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.h - src/core/lib/event_engine/thready_event_engine/thready_event_engine.h @@ -9009,6 +9016,7 @@ targets: - src/core/lib/event_engine/slice_buffer.cc - src/core/lib/event_engine/tcp_socket_utils.cc - src/core/lib/event_engine/thread_pool/original_thread_pool.cc + - src/core/lib/event_engine/thread_pool/thread_count.cc - src/core/lib/event_engine/thread_pool/thread_pool_factory.cc - src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.cc - src/core/lib/event_engine/thready_event_engine/thready_event_engine.cc diff --git a/config.m4 b/config.m4 index d3a08e8442b..52d064461ba 100644 --- a/config.m4 +++ b/config.m4 @@ -553,6 +553,7 @@ if test "$PHP_GRPC" != "no"; then src/core/lib/event_engine/tcp_socket_utils.cc \ src/core/lib/event_engine/thread_local.cc \ src/core/lib/event_engine/thread_pool/original_thread_pool.cc \ + src/core/lib/event_engine/thread_pool/thread_count.cc \ src/core/lib/event_engine/thread_pool/thread_pool_factory.cc \ src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.cc \ src/core/lib/event_engine/thready_event_engine/thready_event_engine.cc \ diff --git a/config.w32 b/config.w32 index 627e303923d..192db270f13 100644 --- a/config.w32 +++ b/config.w32 @@ -518,6 +518,7 @@ if (PHP_GRPC != "no") { "src\\core\\lib\\event_engine\\tcp_socket_utils.cc " + "src\\core\\lib\\event_engine\\thread_local.cc " + "src\\core\\lib\\event_engine\\thread_pool\\original_thread_pool.cc " + + "src\\core\\lib\\event_engine\\thread_pool\\thread_count.cc " + "src\\core\\lib\\event_engine\\thread_pool\\thread_pool_factory.cc " + "src\\core\\lib\\event_engine\\thread_pool\\work_stealing_thread_pool.cc " + "src\\core\\lib\\event_engine\\thready_event_engine\\thready_event_engine.cc " + diff --git a/gRPC-C++.podspec b/gRPC-C++.podspec index e76768e6457..74e72fcab62 100644 --- a/gRPC-C++.podspec +++ b/gRPC-C++.podspec @@ -795,6 +795,7 @@ Pod::Spec.new do |s| 'src/core/lib/event_engine/tcp_socket_utils.h', 'src/core/lib/event_engine/thread_local.h', 'src/core/lib/event_engine/thread_pool/original_thread_pool.h', + 'src/core/lib/event_engine/thread_pool/thread_count.h', 'src/core/lib/event_engine/thread_pool/thread_pool.h', 'src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.h', 'src/core/lib/event_engine/thready_event_engine/thready_event_engine.h', @@ -1850,6 +1851,7 @@ Pod::Spec.new do |s| 'src/core/lib/event_engine/tcp_socket_utils.h', 'src/core/lib/event_engine/thread_local.h', 'src/core/lib/event_engine/thread_pool/original_thread_pool.h', + 'src/core/lib/event_engine/thread_pool/thread_count.h', 'src/core/lib/event_engine/thread_pool/thread_pool.h', 'src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.h', 'src/core/lib/event_engine/thready_event_engine/thready_event_engine.h', diff --git a/gRPC-Core.podspec b/gRPC-Core.podspec index 3d539352b5c..940658b9920 100644 --- a/gRPC-Core.podspec +++ b/gRPC-Core.podspec @@ -1238,6 +1238,8 @@ Pod::Spec.new do |s| 'src/core/lib/event_engine/thread_local.h', 'src/core/lib/event_engine/thread_pool/original_thread_pool.cc', 'src/core/lib/event_engine/thread_pool/original_thread_pool.h', + 'src/core/lib/event_engine/thread_pool/thread_count.cc', + 'src/core/lib/event_engine/thread_pool/thread_count.h', 'src/core/lib/event_engine/thread_pool/thread_pool.h', 'src/core/lib/event_engine/thread_pool/thread_pool_factory.cc', 'src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.cc', @@ -2596,6 +2598,7 @@ Pod::Spec.new do |s| 'src/core/lib/event_engine/tcp_socket_utils.h', 'src/core/lib/event_engine/thread_local.h', 'src/core/lib/event_engine/thread_pool/original_thread_pool.h', + 'src/core/lib/event_engine/thread_pool/thread_count.h', 'src/core/lib/event_engine/thread_pool/thread_pool.h', 'src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.h', 'src/core/lib/event_engine/thready_event_engine/thready_event_engine.h', diff --git a/grpc.gemspec b/grpc.gemspec index 9816b9f87b5..4ffcc56765e 100644 --- a/grpc.gemspec +++ b/grpc.gemspec @@ -1143,6 +1143,8 @@ Gem::Specification.new do |s| s.files += %w( src/core/lib/event_engine/thread_local.h ) s.files += %w( src/core/lib/event_engine/thread_pool/original_thread_pool.cc ) s.files += %w( src/core/lib/event_engine/thread_pool/original_thread_pool.h ) + s.files += %w( src/core/lib/event_engine/thread_pool/thread_count.cc ) + s.files += %w( src/core/lib/event_engine/thread_pool/thread_count.h ) s.files += %w( src/core/lib/event_engine/thread_pool/thread_pool.h ) s.files += %w( src/core/lib/event_engine/thread_pool/thread_pool_factory.cc ) s.files += %w( src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.cc ) diff --git a/grpc.gyp b/grpc.gyp index 1be2679b357..6f78dddb3b0 100644 --- a/grpc.gyp +++ b/grpc.gyp @@ -778,6 +778,7 @@ 'src/core/lib/event_engine/slice_buffer.cc', 'src/core/lib/event_engine/tcp_socket_utils.cc', 'src/core/lib/event_engine/thread_pool/original_thread_pool.cc', + 'src/core/lib/event_engine/thread_pool/thread_count.cc', 'src/core/lib/event_engine/thread_pool/thread_pool_factory.cc', 'src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.cc', 'src/core/lib/event_engine/thready_event_engine/thready_event_engine.cc', @@ -1283,6 +1284,7 @@ 'src/core/lib/event_engine/slice_buffer.cc', 'src/core/lib/event_engine/tcp_socket_utils.cc', 'src/core/lib/event_engine/thread_pool/original_thread_pool.cc', + 'src/core/lib/event_engine/thread_pool/thread_count.cc', 'src/core/lib/event_engine/thread_pool/thread_pool_factory.cc', 'src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.cc', 'src/core/lib/event_engine/thready_event_engine/thready_event_engine.cc', @@ -2008,6 +2010,7 @@ 'src/core/lib/event_engine/slice_buffer.cc', 'src/core/lib/event_engine/tcp_socket_utils.cc', 'src/core/lib/event_engine/thread_pool/original_thread_pool.cc', + 'src/core/lib/event_engine/thread_pool/thread_count.cc', 'src/core/lib/event_engine/thread_pool/thread_pool_factory.cc', 'src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.cc', 'src/core/lib/event_engine/thready_event_engine/thready_event_engine.cc', diff --git a/package.xml b/package.xml index 457bd3d0489..e96864b2a0d 100644 --- a/package.xml +++ b/package.xml @@ -1125,6 +1125,8 @@ + + diff --git a/src/core/BUILD b/src/core/BUILD index 5d99cfbd982..cba9d591d70 100644 --- a/src/core/BUILD +++ b/src/core/BUILD @@ -1508,6 +1508,23 @@ grpc_cc_library( deps = ["//:gpr_platform"], ) +grpc_cc_library( + name = "event_engine_thread_count", + srcs = [ + "lib/event_engine/thread_pool/thread_count.cc", + ], + hdrs = ["lib/event_engine/thread_pool/thread_count.h"], + external_deps = [ + "absl/base:core_headers", + "absl/time", + ], + deps = [ + "time", + "useful", + "//:gpr", + ], +) + grpc_cc_library( name = "event_engine_thread_pool", srcs = [ @@ -1529,6 +1546,7 @@ grpc_cc_library( deps = [ "common_event_engine_closures", "event_engine_basic_work_queue", + "event_engine_thread_count", "event_engine_thread_local", "event_engine_trace", "event_engine_work_queue", diff --git a/src/core/lib/event_engine/thread_pool/thread_count.cc b/src/core/lib/event_engine/thread_pool/thread_count.cc new file mode 100644 index 00000000000..875e8c68cd8 --- /dev/null +++ b/src/core/lib/event_engine/thread_pool/thread_count.cc @@ -0,0 +1,58 @@ +// Copyright 2023 The gRPC Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +#include + +#include "src/core/lib/event_engine/thread_pool/thread_count.h" + +#include + +#include "absl/time/clock.h" +#include "absl/time/time.h" + +#include + +namespace grpc_event_engine { +namespace experimental { + +// -------- LivingThreadCount -------- + +void LivingThreadCount::BlockUntilThreadCount(size_t desired_threads, + const char* why) { + constexpr grpc_core::Duration log_rate = grpc_core::Duration::Seconds(3); + while (true) { + auto curr_threads = WaitForCountChange(desired_threads, log_rate); + if (curr_threads == desired_threads) break; + GRPC_LOG_EVERY_N_SEC_DELAYED( + log_rate.seconds(), GPR_DEBUG, + "Waiting for thread pool to idle before %s. (%" PRIdPTR " to %" PRIdPTR + ")", + why, curr_threads, desired_threads); + } +} + +size_t LivingThreadCount::WaitForCountChange(size_t desired_threads, + grpc_core::Duration timeout) { + size_t count; + auto deadline = absl::Now() + absl::Milliseconds(timeout.millis()); + do { + grpc_core::MutexLock lock(&mu_); + count = CountLocked(); + if (count == desired_threads) break; + cv_.WaitWithDeadline(&mu_, deadline); + } while (absl::Now() < deadline); + return count; +} + +} // namespace experimental +} // namespace grpc_event_engine diff --git a/src/core/lib/event_engine/thread_pool/thread_count.h b/src/core/lib/event_engine/thread_pool/thread_count.h new file mode 100644 index 00000000000..b61f23b5a17 --- /dev/null +++ b/src/core/lib/event_engine/thread_pool/thread_count.h @@ -0,0 +1,161 @@ +// Copyright 2023 The gRPC Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +#ifndef GRPC_SRC_CORE_LIB_EVENT_ENGINE_THREAD_POOL_THREAD_COUNT_H +#define GRPC_SRC_CORE_LIB_EVENT_ENGINE_THREAD_POOL_THREAD_COUNT_H + +#include + +#include +#include +#include +#include +#include + +#include "absl/base/thread_annotations.h" + +#include + +#include "src/core/lib/gpr/useful.h" +#include "src/core/lib/gprpp/sync.h" +#include "src/core/lib/gprpp/time.h" + +namespace grpc_event_engine { +namespace experimental { + +// Tracks counts across some fixed number of shards. +// It is intended for fast increment/decrement operations, but a slower overall +// count operation. +class BusyThreadCount { + public: + // Increments a per-shard counter on construction, decrements on destruction. + class AutoThreadCounter { + public: + AutoThreadCounter(BusyThreadCount* counter, size_t idx) + : counter_(counter), idx_(idx) { + counter_->Increment(idx_); + } + ~AutoThreadCounter() { + if (counter_ != nullptr) counter_->Decrement(idx_); + } + // not copyable + AutoThreadCounter(const AutoThreadCounter&) = delete; + AutoThreadCounter& operator=(const AutoThreadCounter&) = delete; + // moveable + AutoThreadCounter(AutoThreadCounter&& other) noexcept { + counter_ = std::exchange(other.counter_, nullptr); + idx_ = other.idx_; + } + AutoThreadCounter& operator=(AutoThreadCounter&& other) noexcept { + counter_ = std::exchange(other.counter_, nullptr); + idx_ = other.idx_; + return *this; + } + + private: + BusyThreadCount* counter_; + size_t idx_; + }; + + BusyThreadCount() : shards_(grpc_core::Clamp(gpr_cpu_num_cores(), 2u, 64u)) {} + AutoThreadCounter MakeAutoThreadCounter(size_t idx) { + return AutoThreadCounter(this, idx); + }; + void Increment(size_t idx) { + shards_[idx].busy_count.fetch_add(1, std::memory_order_relaxed); + } + void Decrement(size_t idx) { + shards_[idx].busy_count.fetch_sub(1, std::memory_order_relaxed); + } + size_t count() { + return std::accumulate( + shards_.begin(), shards_.end(), 0, [](size_t running, ShardedData& d) { + return running + d.busy_count.load(std::memory_order_relaxed); + }); + } + // Returns some valid index into the per-shard data, which is rotated on every + // call to distribute load and reduce contention. + size_t NextIndex() { return next_idx_.fetch_add(1) % shards_.size(); } + + private: + struct ShardedData { + std::atomic busy_count{0}; + } GPR_ALIGN_STRUCT(GPR_CACHELINE_SIZE); + + std::vector shards_; + std::atomic next_idx_{0}; +}; + +// Tracks the number of living threads. It is intended for a fast count +// operation, with relatively slower increment/decrement operations. +class LivingThreadCount { + public: + // Increments the global counter on construction, decrements on destruction. + class AutoThreadCounter { + public: + explicit AutoThreadCounter(LivingThreadCount* counter) : counter_(counter) { + counter_->Increment(); + } + ~AutoThreadCounter() { + if (counter_ != nullptr) counter_->Decrement(); + } + // not copyable + AutoThreadCounter(const AutoThreadCounter&) = delete; + AutoThreadCounter& operator=(const AutoThreadCounter&) = delete; + // moveable + AutoThreadCounter(AutoThreadCounter&& other) noexcept { + counter_ = std::exchange(other.counter_, nullptr); + } + AutoThreadCounter& operator=(AutoThreadCounter&& other) noexcept { + counter_ = std::exchange(other.counter_, nullptr); + return *this; + } + + private: + LivingThreadCount* counter_; + }; + + AutoThreadCounter MakeAutoThreadCounter() { return AutoThreadCounter(this); }; + void Increment() ABSL_LOCKS_EXCLUDED(mu_) { + grpc_core::MutexLock lock(&mu_); + ++living_count_; + cv_.SignalAll(); + } + void Decrement() ABSL_LOCKS_EXCLUDED(mu_) { + grpc_core::MutexLock lock(&mu_); + --living_count_; + cv_.SignalAll(); + } + void BlockUntilThreadCount(size_t desired_threads, const char* why) + ABSL_LOCKS_EXCLUDED(mu_); + size_t count() ABSL_LOCKS_EXCLUDED(mu_) { + grpc_core::MutexLock lock(&mu_); + return CountLocked(); + } + + private: + size_t CountLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) { + return living_count_; + } + size_t WaitForCountChange(size_t desired_threads, + grpc_core::Duration timeout); + + grpc_core::Mutex mu_; + grpc_core::CondVar cv_ ABSL_GUARDED_BY(mu_); + size_t living_count_ ABSL_GUARDED_BY(mu_) = 0; +}; + +} // namespace experimental +} // namespace grpc_event_engine + +#endif // GRPC_SRC_CORE_LIB_EVENT_ENGINE_THREAD_POOL_THREAD_COUNT_H diff --git a/src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.cc b/src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.cc index 29eefb2facf..6f95deac9eb 100644 --- a/src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.cc +++ b/src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.cc @@ -209,9 +209,9 @@ void WorkStealingThreadPool::WorkStealingThreadPoolImpl::Quiesce() { // until all other threads have exited, so we need to wait for just one thread // running instead of zero. bool is_threadpool_thread = g_local_queue != nullptr; - thread_count()->BlockUntilThreadCount(CounterType::kLivingThreadCount, - is_threadpool_thread ? 1 : 0, - "shutting down", work_signal()); + work_signal()->SignalAll(); + living_thread_count_.BlockUntilThreadCount(is_threadpool_thread ? 1 : 0, + "shutting down"); GPR_ASSERT(queue_.Empty()); quiesced_.store(true, std::memory_order_relaxed); lifeguard_.BlockUntilShutdownAndReset(); @@ -249,8 +249,8 @@ bool WorkStealingThreadPool::WorkStealingThreadPoolImpl::IsQuiesced() { void WorkStealingThreadPool::WorkStealingThreadPoolImpl::PrepareFork() { SetForking(true); - thread_count()->BlockUntilThreadCount(CounterType::kLivingThreadCount, 0, - "forking", &work_signal_); + work_signal_.SignalAll(); + living_thread_count_.BlockUntilThreadCount(0, "forking"); lifeguard_.BlockUntilShutdownAndReset(); } @@ -329,12 +329,9 @@ void WorkStealingThreadPool::WorkStealingThreadPoolImpl::Lifeguard:: // No new threads are started when forking. // No new work is done when forking needs to begin. if (pool_->forking_.load()) return; - int busy_thread_count = - pool_->thread_count_.GetCount(CounterType::kBusyCount); - int living_thread_count = - pool_->thread_count_.GetCount(CounterType::kLivingThreadCount); + const auto living_thread_count = pool_->living_thread_count()->count(); // Wake an idle worker thread if there's global work to be had. - if (busy_thread_count < living_thread_count) { + if (pool_->busy_thread_count()->count() < living_thread_count) { if (!pool_->queue_.Empty()) { pool_->work_signal()->Signal(); backoff_.Reset(); @@ -357,7 +354,8 @@ void WorkStealingThreadPool::WorkStealingThreadPoolImpl::Lifeguard:: // queue, nor any work to steal. Add more sophisticated logic about when to // start a thread. GRPC_EVENT_ENGINE_TRACE( - "Starting new ThreadPool thread due to backlog (total threads: %d)", + "Starting new ThreadPool thread due to backlog (total threads: %" PRIuPTR + ")", living_thread_count + 1); pool_->StartThread(); // Tell the lifeguard to monitor the pool more closely. @@ -369,12 +367,13 @@ void WorkStealingThreadPool::WorkStealingThreadPoolImpl::Lifeguard:: WorkStealingThreadPool::ThreadState::ThreadState( std::shared_ptr pool) : pool_(std::move(pool)), - auto_thread_count_(pool_->thread_count(), - CounterType::kLivingThreadCount), + auto_thread_counter_( + pool_->living_thread_count()->MakeAutoThreadCounter()), backoff_(grpc_core::BackOff::Options() .set_initial_backoff(kWorkerThreadMinSleepBetweenChecks) .set_max_backoff(kWorkerThreadMaxSleepBetweenChecks) - .set_multiplier(1.3)) {} + .set_multiplier(1.3)), + busy_count_idx_(pool_->busy_thread_count()->NextIndex()) {} void WorkStealingThreadPool::ThreadState::ThreadBody() { g_local_queue = new BasicWorkQueue(); @@ -412,8 +411,8 @@ bool WorkStealingThreadPool::ThreadState::Step() { auto* closure = g_local_queue->PopMostRecent(); // If local work is available, run it. if (closure != nullptr) { - ThreadCount::AutoThreadCount auto_busy{pool_->thread_count(), - CounterType::kBusyCount}; + auto busy = + pool_->busy_thread_count()->MakeAutoThreadCounter(busy_count_idx_); closure->Run(); return true; } @@ -450,8 +449,7 @@ bool WorkStealingThreadPool::ThreadState::Step() { // Quit a thread if the pool has more than it requires, and this thread // has been idle long enough. if (timed_out && - pool_->thread_count()->GetCount(CounterType::kLivingThreadCount) > - pool_->reserve_threads() && + pool_->living_thread_count()->count() > pool_->reserve_threads() && grpc_core::Timestamp::Now() - start_time > kIdleThreadLimit) { return false; } @@ -462,8 +460,8 @@ bool WorkStealingThreadPool::ThreadState::Step() { return false; } if (closure != nullptr) { - ThreadCount::AutoThreadCount auto_busy{pool_->thread_count(), - CounterType::kBusyCount}; + auto busy = + pool_->busy_thread_count()->MakeAutoThreadCounter(busy_count_idx_); closure->Run(); } backoff_.Reset(); @@ -472,8 +470,8 @@ bool WorkStealingThreadPool::ThreadState::Step() { void WorkStealingThreadPool::ThreadState::FinishDraining() { // The thread is definitionally busy while draining - ThreadCount::AutoThreadCount auto_busy{pool_->thread_count(), - CounterType::kBusyCount}; + auto busy = + pool_->busy_thread_count()->MakeAutoThreadCounter(busy_count_idx_); // If a fork occurs at any point during shutdown, quit draining. The post-fork // threads will finish draining the global queue. while (!pool_->IsForking()) { @@ -495,72 +493,6 @@ void WorkStealingThreadPool::ThreadState::FinishDraining() { } } -// -------- WorkStealingThreadPool::ThreadCount -------- - -void WorkStealingThreadPool::ThreadCount::Add(CounterType counter_type) { - grpc_core::MutexLock lock(&wait_mu_[counter_type]); - ++thread_counts_[counter_type]; - wait_cv_[counter_type].SignalAll(); -} - -void WorkStealingThreadPool::ThreadCount::Remove(CounterType counter_type) { - grpc_core::MutexLock lock(&wait_mu_[counter_type]); - --thread_counts_[counter_type]; - wait_cv_[counter_type].SignalAll(); -} - -void WorkStealingThreadPool::ThreadCount::BlockUntilThreadCount( - CounterType counter_type, size_t desired_threads, const char* why, - WorkSignal* work_signal) { - // Wait for all threads to exit. - work_signal->SignalAll(); - while (true) { - auto curr_threads = WaitForCountChange( - counter_type, desired_threads, - grpc_core::Duration::Seconds(kBlockingQuiesceLogRateSeconds)); - if (curr_threads == desired_threads) break; - GRPC_LOG_EVERY_N_SEC_DELAYED( - kBlockingQuiesceLogRateSeconds, GPR_DEBUG, - "Waiting for thread pool to idle before %s. (%" PRIdPTR " to %" PRIdPTR - ")", - why, curr_threads, desired_threads); - } -} - -size_t WorkStealingThreadPool::ThreadCount::WaitForCountChange( - CounterType counter_type, size_t desired_threads, - grpc_core::Duration timeout) { - size_t count; - auto deadline = absl::Now() + absl::Milliseconds(timeout.millis()); - do { - grpc_core::MutexLock lock(&wait_mu_[counter_type]); - count = GetCountLocked(counter_type); - if (count == desired_threads) break; - wait_cv_[counter_type].WaitWithDeadline(&wait_mu_[counter_type], deadline); - } while (absl::Now() < deadline); - return count; -} - -size_t WorkStealingThreadPool::ThreadCount::GetCount(CounterType counter_type) { - grpc_core::MutexLock lock(&wait_mu_[counter_type]); - return GetCountLocked(counter_type); -} - -size_t WorkStealingThreadPool::ThreadCount::GetCountLocked( - CounterType counter_type) { - return thread_counts_[counter_type]; -} - -WorkStealingThreadPool::ThreadCount::AutoThreadCount::AutoThreadCount( - ThreadCount* counter, CounterType counter_type) - : counter_(counter), counter_type_(counter_type) { - counter_->Add(counter_type_); -} - -WorkStealingThreadPool::ThreadCount::AutoThreadCount::~AutoThreadCount() { - counter_->Remove(counter_type_); -} - // -------- WorkStealingThreadPool::WorkSignal -------- void WorkStealingThreadPool::WorkSignal::Signal() { diff --git a/src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.h b/src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.h index 96afc8ab540..2fc646b84eb 100644 --- a/src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.h +++ b/src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.h @@ -33,6 +33,7 @@ #include #include "src/core/lib/backoff/backoff.h" +#include "src/core/lib/event_engine/thread_pool/thread_count.h" #include "src/core/lib/event_engine/thread_pool/thread_pool.h" #include "src/core/lib/event_engine/work_queue/basic_work_queue.h" #include "src/core/lib/event_engine/work_queue/work_queue.h" @@ -76,56 +77,6 @@ class WorkStealingThreadPool final : public ThreadPool { grpc_core::CondVar cv_ ABSL_GUARDED_BY(mu_); }; - // Types of thread counts. - // Note this is intentionally not an enum class, the keys are used as indexes - // into the ThreadCount's private array. - enum CounterType { - kLivingThreadCount = 0, - kBusyCount, - }; - - class ThreadCount { - public: - // Adds 1 to the thread count for that counter type. - void Add(CounterType counter_type) - ABSL_LOCKS_EXCLUDED(wait_mu_[counter_type]); - // Subtracts 1 from the thread count for that counter type. - void Remove(CounterType counter_type) - ABSL_LOCKS_EXCLUDED(wait_mu_[counter_type]); - // Blocks until the thread count for that type reaches `desired_threads`. - void BlockUntilThreadCount(CounterType counter_type, size_t desired_threads, - const char* why, WorkSignal* work_signal) - ABSL_LOCKS_EXCLUDED(wait_mu_[counter_type]); - // Returns the current thread count for the tracked type. - size_t GetCount(CounterType counter_type) - ABSL_LOCKS_EXCLUDED(wait_mu_[counter_type]); - // Returns the current thread count for the tracked type. - size_t GetCountLocked(CounterType counter_type) - ABSL_EXCLUSIVE_LOCKS_REQUIRED(wait_mu_[counter_type]); - - // Adds and removes thread counts on construction and destruction - class AutoThreadCount { - public: - AutoThreadCount(ThreadCount* counter, CounterType counter_type); - ~AutoThreadCount(); - - private: - ThreadCount* counter_; - CounterType counter_type_; - }; - - private: - // Wait for the desired count to be reached. - // Returns the current thread count either when the desired count is - // reached, or when the deadline has passed, whichever happens first. - size_t WaitForCountChange(CounterType counter_type, size_t desired_threads, - grpc_core::Duration timeout); - - grpc_core::Mutex wait_mu_[2]; - grpc_core::CondVar wait_cv_[2]; - size_t thread_counts_[2]{0, 0}; - }; - // A pool of WorkQueues that participate in work stealing. // // Every worker thread registers and unregisters its thread-local thread pool @@ -186,7 +137,8 @@ class WorkStealingThreadPool final : public ThreadPool { bool IsForking(); bool IsQuiesced(); size_t reserve_threads() { return reserve_threads_; } - ThreadCount* thread_count() { return &thread_count_; } + BusyThreadCount* busy_thread_count() { return &busy_thread_count_; } + LivingThreadCount* living_thread_count() { return &living_thread_count_; } TheftRegistry* theft_registry() { return &theft_registry_; } WorkQueue* queue() { return &queue_; } WorkSignal* work_signal() { return &work_signal_; } @@ -221,7 +173,8 @@ class WorkStealingThreadPool final : public ThreadPool { }; const size_t reserve_threads_; - ThreadCount thread_count_; + BusyThreadCount busy_thread_count_; + LivingThreadCount living_thread_count_; TheftRegistry theft_registry_; BasicWorkQueue queue_; // Track shutdown and fork bits separately. @@ -254,11 +207,11 @@ class WorkStealingThreadPool final : public ThreadPool { // is decremented at time of destruction. This is necessary when this thread // state holds the last shared_ptr keeping the pool alive. std::shared_ptr pool_; - // auto_thread_count_ must be the second member declared, so that the thread - // count is decremented after all other state is cleaned up (preventing - // leaks). - ThreadCount::AutoThreadCount auto_thread_count_; + // auto_thread_counter_ must be declared after pool_, so that the thread + // count is decremented after all other pool state is cleaned up. + LivingThreadCount::AutoThreadCounter auto_thread_counter_; grpc_core::BackOff backoff_; + size_t busy_count_idx_; }; const std::shared_ptr pool_; diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py index 85eb99bbeba..935f005225e 100644 --- a/src/python/grpcio/grpc_core_dependencies.py +++ b/src/python/grpcio/grpc_core_dependencies.py @@ -527,6 +527,7 @@ CORE_SOURCE_FILES = [ 'src/core/lib/event_engine/tcp_socket_utils.cc', 'src/core/lib/event_engine/thread_local.cc', 'src/core/lib/event_engine/thread_pool/original_thread_pool.cc', + 'src/core/lib/event_engine/thread_pool/thread_count.cc', 'src/core/lib/event_engine/thread_pool/thread_pool_factory.cc', 'src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.cc', 'src/core/lib/event_engine/thready_event_engine/thready_event_engine.cc', diff --git a/test/core/event_engine/BUILD b/test/core/event_engine/BUILD index e2a3d992626..a5441e2b962 100644 --- a/test/core/event_engine/BUILD +++ b/test/core/event_engine/BUILD @@ -60,6 +60,7 @@ grpc_cc_test( deps = [ "//:gpr", "//:grpc", + "//src/core:event_engine_thread_count", "//src/core:event_engine_thread_pool", "//src/core:notification", "//test/core/util:grpc_test_util_unsecure", diff --git a/test/core/event_engine/thread_pool_test.cc b/test/core/event_engine/thread_pool_test.cc index 8aa86c14a8f..1bf97989897 100644 --- a/test/core/event_engine/thread_pool_test.cc +++ b/test/core/event_engine/thread_pool_test.cc @@ -14,11 +14,13 @@ #include "src/core/lib/event_engine/thread_pool/thread_pool.h" +#include #include #include #include #include #include +#include #include "absl/time/clock.h" #include "absl/time/time.h" @@ -27,6 +29,7 @@ #include #include "src/core/lib/event_engine/thread_pool/original_thread_pool.h" +#include "src/core/lib/event_engine/thread_pool/thread_count.h" #include "src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.h" #include "src/core/lib/gprpp/notification.h" #include "src/core/lib/gprpp/thd.h" @@ -112,7 +115,7 @@ TYPED_TEST(ThreadPoolTest, ForkStressTest) { } runcount.fetch_add(1, std::memory_order_relaxed); }; - for (int i = 0; i < expected_runcount; i++) { + for (auto i = 0; i < expected_runcount; i++) { pool.Run(inner_fn); } // simulate multiple forks at a fixed frequency @@ -141,7 +144,7 @@ TYPED_TEST(ThreadPoolTest, StartQuiesceRaceStressTest) { std::unique_ptr pool; int i; }; - for (int i = 0; i < iter_count; i++) { + for (auto i = 0; i < iter_count; i++) { ThdState state{std::make_unique(8), i}; state.pool->PrepareFork(); grpc_core::Thread t1( @@ -209,7 +212,7 @@ TEST_F(WorkStealingThreadPoolTest, ScalesWhenBackloggedFromGlobalQueue) { // Ensures the pool is saturated before signaling closures to continue. std::atomic waiters{0}; std::atomic signaled{false}; - for (int i = 0; i < pool_thread_count; i++) { + for (auto i = 0; i < pool_thread_count; i++) { p.Run([&]() { waiters.fetch_add(1); while (!signaled.load()) { @@ -232,7 +235,7 @@ TEST_F(WorkStealingThreadPoolTest, ScalesWhenBackloggedFromGlobalQueue) { // is fixed, or the implementation is deleted, make this a typed test again. TEST_F(WorkStealingThreadPoolTest, ScalesWhenBackloggedFromSingleThreadLocalQueue) { - int pool_thread_count = 8; + constexpr int pool_thread_count = 8; WorkStealingThreadPool p(pool_thread_count); grpc_core::Notification signal; // Ensures the pool is saturated before signaling closures to continue. @@ -262,18 +265,193 @@ TEST_F(WorkStealingThreadPoolTest, // pool, it takes around 50s to run. When that is fixed, or the implementation // is deleted, make this a typed test again. TEST_F(WorkStealingThreadPoolTest, QuiesceRaceStressTest) { - int cycle_count = 333; - int thread_count = 8; - int run_count = thread_count * 2; - for (int i = 0; i < cycle_count; i++) { + constexpr int cycle_count = 333; + constexpr int thread_count = 8; + constexpr int run_count = thread_count * 2; + for (auto i = 0; i < cycle_count; i++) { WorkStealingThreadPool p(thread_count); - for (int j = 0; j < run_count; j++) { + for (auto j = 0; j < run_count; j++) { p.Run([]() {}); } p.Quiesce(); } } +class BusyThreadCountTest : public testing::Test {}; + +TEST_F(BusyThreadCountTest, StressTest) { + // Spawns a large number of threads to concurrently increments/decrement the + // counters, and request count totals. Magic numbers were tuned for tests to + // run in a reasonable amount of time. + constexpr size_t thread_count = 300; + constexpr int run_count = 1000; + constexpr int increment_by = 50; + BusyThreadCount busy_thread_count; + grpc_core::Notification stop_counting; + std::thread counter_thread([&]() { + while (!stop_counting.HasBeenNotified()) { + busy_thread_count.count(); + } + }); + std::vector threads; + threads.reserve(thread_count); + for (size_t i = 0; i < thread_count; i++) { + threads.emplace_back([&]() { + for (int j = 0; j < run_count; j++) { + // Get a new index for every iteration. + // This is not the intended use, but further stress tests the NextIndex + // function. + auto thread_idx = busy_thread_count.NextIndex(); + for (int inc = 0; inc < increment_by; inc++) { + busy_thread_count.Increment(thread_idx); + } + for (int inc = 0; inc < increment_by; inc++) { + busy_thread_count.Decrement(thread_idx); + } + } + }); + } + for (auto& thd : threads) thd.join(); + stop_counting.Notify(); + counter_thread.join(); + ASSERT_EQ(busy_thread_count.count(), 0); +} + +TEST_F(BusyThreadCountTest, AutoCountStressTest) { + // Spawns a large number of threads to concurrently increments/decrement the + // counters, and request count totals. Magic numbers were tuned for tests to + // run in a reasonable amount of time. + constexpr size_t thread_count = 150; + constexpr int run_count = 1000; + constexpr int increment_by = 30; + BusyThreadCount busy_thread_count; + grpc_core::Notification stop_counting; + std::thread counter_thread([&]() { + while (!stop_counting.HasBeenNotified()) { + busy_thread_count.count(); + } + }); + std::vector threads; + threads.reserve(thread_count); + for (size_t i = 0; i < thread_count; i++) { + threads.emplace_back([&]() { + for (int j = 0; j < run_count; j++) { + std::vector auto_counters; + auto_counters.reserve(increment_by); + for (int ctr_count = 0; ctr_count < increment_by; ctr_count++) { + auto_counters.push_back(busy_thread_count.MakeAutoThreadCounter( + busy_thread_count.NextIndex())); + } + } + }); + } + for (auto& thd : threads) thd.join(); + stop_counting.Notify(); + counter_thread.join(); + ASSERT_EQ(busy_thread_count.count(), 0); +} + +class LivingThreadCountTest : public testing::Test {}; + +TEST_F(LivingThreadCountTest, StressTest) { + // Spawns a large number of threads to concurrently increments/decrement the + // counters, and request count totals. Magic numbers were tuned for tests to + // run in a reasonable amount of time. + constexpr size_t thread_count = 50; + constexpr int run_count = 1000; + constexpr int increment_by = 10; + LivingThreadCount living_thread_count; + grpc_core::Notification stop_counting; + std::thread counter_thread([&]() { + while (!stop_counting.HasBeenNotified()) { + living_thread_count.count(); + } + }); + std::vector threads; + threads.reserve(thread_count); + for (size_t i = 0; i < thread_count; i++) { + threads.emplace_back([&]() { + for (int j = 0; j < run_count; j++) { + // Get a new index for every iteration. + // This is not the intended use, but further stress tests the NextIndex + // function. + for (int inc = 0; inc < increment_by; inc++) { + living_thread_count.Increment(); + } + for (int inc = 0; inc < increment_by; inc++) { + living_thread_count.Decrement(); + } + } + }); + } + for (auto& thd : threads) thd.join(); + stop_counting.Notify(); + counter_thread.join(); + ASSERT_EQ(living_thread_count.count(), 0); +} + +TEST_F(LivingThreadCountTest, AutoCountStressTest) { + // Spawns a large number of threads to concurrently increments/decrement the + // counters, and request count totals. Magic numbers were tuned for tests to + // run in a reasonable amount of time. + constexpr size_t thread_count = 50; + constexpr int run_count = 1000; + constexpr int increment_by = 10; + LivingThreadCount living_thread_count; + grpc_core::Notification stop_counting; + std::thread counter_thread([&]() { + while (!stop_counting.HasBeenNotified()) { + living_thread_count.count(); + } + }); + std::vector threads; + threads.reserve(thread_count); + for (size_t i = 0; i < thread_count; i++) { + threads.emplace_back([&]() { + for (int j = 0; j < run_count; j++) { + std::vector auto_counters; + auto_counters.reserve(increment_by); + for (int ctr_count = 0; ctr_count < increment_by; ctr_count++) { + auto_counters.push_back(living_thread_count.MakeAutoThreadCounter()); + } + } + }); + } + for (auto& thd : threads) thd.join(); + stop_counting.Notify(); + counter_thread.join(); + ASSERT_EQ(living_thread_count.count(), 0); +} + +TEST_F(LivingThreadCountTest, BlockUntilThreadCountTest) { + constexpr size_t thread_count = 100; + grpc_core::Notification waiting; + LivingThreadCount living_thread_count; + std::vector threads; + threads.reserve(thread_count); + // Start N living threads + for (size_t i = 0; i < thread_count; i++) { + threads.emplace_back([&]() { + auto alive = living_thread_count.MakeAutoThreadCounter(); + waiting.WaitForNotification(); + }); + } + // Join in a separate thread + std::thread joiner([&]() { + waiting.Notify(); + for (auto& thd : threads) thd.join(); + }); + { + auto alive = living_thread_count.MakeAutoThreadCounter(); + living_thread_count.BlockUntilThreadCount(1, + "block until 1 thread remains"); + } + living_thread_count.BlockUntilThreadCount(0, + "block until all threads are gone"); + joiner.join(); + ASSERT_EQ(living_thread_count.count(), 0); +} + } // namespace experimental } // namespace grpc_event_engine diff --git a/tools/doxygen/Doxyfile.c++.internal b/tools/doxygen/Doxyfile.c++.internal index 852a157e368..e08e77e05a4 100644 --- a/tools/doxygen/Doxyfile.c++.internal +++ b/tools/doxygen/Doxyfile.c++.internal @@ -2140,6 +2140,8 @@ src/core/lib/event_engine/thread_local.cc \ src/core/lib/event_engine/thread_local.h \ src/core/lib/event_engine/thread_pool/original_thread_pool.cc \ src/core/lib/event_engine/thread_pool/original_thread_pool.h \ +src/core/lib/event_engine/thread_pool/thread_count.cc \ +src/core/lib/event_engine/thread_pool/thread_count.h \ src/core/lib/event_engine/thread_pool/thread_pool.h \ src/core/lib/event_engine/thread_pool/thread_pool_factory.cc \ src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.cc \ diff --git a/tools/doxygen/Doxyfile.core.internal b/tools/doxygen/Doxyfile.core.internal index a61cf0a9f98..166a2822a07 100644 --- a/tools/doxygen/Doxyfile.core.internal +++ b/tools/doxygen/Doxyfile.core.internal @@ -1918,6 +1918,8 @@ src/core/lib/event_engine/thread_local.cc \ src/core/lib/event_engine/thread_local.h \ src/core/lib/event_engine/thread_pool/original_thread_pool.cc \ src/core/lib/event_engine/thread_pool/original_thread_pool.h \ +src/core/lib/event_engine/thread_pool/thread_count.cc \ +src/core/lib/event_engine/thread_pool/thread_count.h \ src/core/lib/event_engine/thread_pool/thread_pool.h \ src/core/lib/event_engine/thread_pool/thread_pool_factory.cc \ src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.cc \