[EventEngine] Improve lock contention in WorkStealingThreadPool (alternative) (#34065)

Proposed alternative to https://github.com/grpc/grpc/pull/34024.

This version has a simpler, faster busy-count implementation based on a
sharded set of atomic counts: fast increment/decrement operations,
relatively slower summation of total counts (which need to happen much
less frequently).
pull/33175/head
AJ Heller 1 year ago committed by GitHub
parent b85b57fdc7
commit 108af0a94f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 4
      CMakeLists.txt
  2. 2
      Makefile
  3. 2
      Package.swift
  4. 8
      build_autogenerated.yaml
  5. 1
      config.m4
  6. 1
      config.w32
  7. 2
      gRPC-C++.podspec
  8. 3
      gRPC-Core.podspec
  9. 2
      grpc.gemspec
  10. 3
      grpc.gyp
  11. 2
      package.xml
  12. 18
      src/core/BUILD
  13. 58
      src/core/lib/event_engine/thread_pool/thread_count.cc
  14. 161
      src/core/lib/event_engine/thread_pool/thread_count.h
  15. 108
      src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.cc
  16. 65
      src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.h
  17. 1
      src/python/grpcio/grpc_core_dependencies.py
  18. 1
      test/core/event_engine/BUILD
  19. 196
      test/core/event_engine/thread_pool_test.cc
  20. 2
      tools/doxygen/Doxyfile.c++.internal
  21. 2
      tools/doxygen/Doxyfile.core.internal

4
CMakeLists.txt generated

@ -2179,6 +2179,7 @@ add_library(grpc
src/core/lib/event_engine/slice_buffer.cc
src/core/lib/event_engine/tcp_socket_utils.cc
src/core/lib/event_engine/thread_pool/original_thread_pool.cc
src/core/lib/event_engine/thread_pool/thread_count.cc
src/core/lib/event_engine/thread_pool/thread_pool_factory.cc
src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.cc
src/core/lib/event_engine/thready_event_engine/thready_event_engine.cc
@ -2883,6 +2884,7 @@ add_library(grpc_unsecure
src/core/lib/event_engine/slice_buffer.cc
src/core/lib/event_engine/tcp_socket_utils.cc
src/core/lib/event_engine/thread_pool/original_thread_pool.cc
src/core/lib/event_engine/thread_pool/thread_count.cc
src/core/lib/event_engine/thread_pool/thread_pool_factory.cc
src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.cc
src/core/lib/event_engine/thready_event_engine/thready_event_engine.cc
@ -4779,6 +4781,7 @@ add_library(grpc_authorization_provider
src/core/lib/event_engine/slice_buffer.cc
src/core/lib/event_engine/tcp_socket_utils.cc
src/core/lib/event_engine/thread_pool/original_thread_pool.cc
src/core/lib/event_engine/thread_pool/thread_count.cc
src/core/lib/event_engine/thread_pool/thread_pool_factory.cc
src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.cc
src/core/lib/event_engine/thready_event_engine/thready_event_engine.cc
@ -12071,6 +12074,7 @@ add_executable(frame_test
src/core/lib/event_engine/slice_buffer.cc
src/core/lib/event_engine/tcp_socket_utils.cc
src/core/lib/event_engine/thread_pool/original_thread_pool.cc
src/core/lib/event_engine/thread_pool/thread_count.cc
src/core/lib/event_engine/thread_pool/thread_pool_factory.cc
src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.cc
src/core/lib/event_engine/thready_event_engine/thready_event_engine.cc

2
Makefile generated

@ -1462,6 +1462,7 @@ LIBGRPC_SRC = \
src/core/lib/event_engine/slice_buffer.cc \
src/core/lib/event_engine/tcp_socket_utils.cc \
src/core/lib/event_engine/thread_pool/original_thread_pool.cc \
src/core/lib/event_engine/thread_pool/thread_count.cc \
src/core/lib/event_engine/thread_pool/thread_pool_factory.cc \
src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.cc \
src/core/lib/event_engine/thready_event_engine/thready_event_engine.cc \
@ -2027,6 +2028,7 @@ LIBGRPC_UNSECURE_SRC = \
src/core/lib/event_engine/slice_buffer.cc \
src/core/lib/event_engine/tcp_socket_utils.cc \
src/core/lib/event_engine/thread_pool/original_thread_pool.cc \
src/core/lib/event_engine/thread_pool/thread_count.cc \
src/core/lib/event_engine/thread_pool/thread_pool_factory.cc \
src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.cc \
src/core/lib/event_engine/thready_event_engine/thready_event_engine.cc \

2
Package.swift generated

@ -1137,6 +1137,8 @@ let package = Package(
"src/core/lib/event_engine/thread_local.h",
"src/core/lib/event_engine/thread_pool/original_thread_pool.cc",
"src/core/lib/event_engine/thread_pool/original_thread_pool.h",
"src/core/lib/event_engine/thread_pool/thread_count.cc",
"src/core/lib/event_engine/thread_pool/thread_count.h",
"src/core/lib/event_engine/thread_pool/thread_pool.h",
"src/core/lib/event_engine/thread_pool/thread_pool_factory.cc",
"src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.cc",

@ -723,6 +723,7 @@ libs:
- src/core/lib/event_engine/shim.h
- src/core/lib/event_engine/tcp_socket_utils.h
- src/core/lib/event_engine/thread_pool/original_thread_pool.h
- src/core/lib/event_engine/thread_pool/thread_count.h
- src/core/lib/event_engine/thread_pool/thread_pool.h
- src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.h
- src/core/lib/event_engine/thready_event_engine/thready_event_engine.h
@ -1532,6 +1533,7 @@ libs:
- src/core/lib/event_engine/slice_buffer.cc
- src/core/lib/event_engine/tcp_socket_utils.cc
- src/core/lib/event_engine/thread_pool/original_thread_pool.cc
- src/core/lib/event_engine/thread_pool/thread_count.cc
- src/core/lib/event_engine/thread_pool/thread_pool_factory.cc
- src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.cc
- src/core/lib/event_engine/thready_event_engine/thready_event_engine.cc
@ -2126,6 +2128,7 @@ libs:
- src/core/lib/event_engine/shim.h
- src/core/lib/event_engine/tcp_socket_utils.h
- src/core/lib/event_engine/thread_pool/original_thread_pool.h
- src/core/lib/event_engine/thread_pool/thread_count.h
- src/core/lib/event_engine/thread_pool/thread_pool.h
- src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.h
- src/core/lib/event_engine/thready_event_engine/thready_event_engine.h
@ -2542,6 +2545,7 @@ libs:
- src/core/lib/event_engine/slice_buffer.cc
- src/core/lib/event_engine/tcp_socket_utils.cc
- src/core/lib/event_engine/thread_pool/original_thread_pool.cc
- src/core/lib/event_engine/thread_pool/thread_count.cc
- src/core/lib/event_engine/thread_pool/thread_pool_factory.cc
- src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.cc
- src/core/lib/event_engine/thready_event_engine/thready_event_engine.cc
@ -4088,6 +4092,7 @@ libs:
- src/core/lib/event_engine/shim.h
- src/core/lib/event_engine/tcp_socket_utils.h
- src/core/lib/event_engine/thread_pool/original_thread_pool.h
- src/core/lib/event_engine/thread_pool/thread_count.h
- src/core/lib/event_engine/thread_pool/thread_pool.h
- src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.h
- src/core/lib/event_engine/thready_event_engine/thready_event_engine.h
@ -4384,6 +4389,7 @@ libs:
- src/core/lib/event_engine/slice_buffer.cc
- src/core/lib/event_engine/tcp_socket_utils.cc
- src/core/lib/event_engine/thread_pool/original_thread_pool.cc
- src/core/lib/event_engine/thread_pool/thread_count.cc
- src/core/lib/event_engine/thread_pool/thread_pool_factory.cc
- src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.cc
- src/core/lib/event_engine/thready_event_engine/thready_event_engine.cc
@ -8731,6 +8737,7 @@ targets:
- src/core/lib/event_engine/shim.h
- src/core/lib/event_engine/tcp_socket_utils.h
- src/core/lib/event_engine/thread_pool/original_thread_pool.h
- src/core/lib/event_engine/thread_pool/thread_count.h
- src/core/lib/event_engine/thread_pool/thread_pool.h
- src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.h
- src/core/lib/event_engine/thready_event_engine/thready_event_engine.h
@ -9009,6 +9016,7 @@ targets:
- src/core/lib/event_engine/slice_buffer.cc
- src/core/lib/event_engine/tcp_socket_utils.cc
- src/core/lib/event_engine/thread_pool/original_thread_pool.cc
- src/core/lib/event_engine/thread_pool/thread_count.cc
- src/core/lib/event_engine/thread_pool/thread_pool_factory.cc
- src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.cc
- src/core/lib/event_engine/thready_event_engine/thready_event_engine.cc

1
config.m4 generated

@ -553,6 +553,7 @@ if test "$PHP_GRPC" != "no"; then
src/core/lib/event_engine/tcp_socket_utils.cc \
src/core/lib/event_engine/thread_local.cc \
src/core/lib/event_engine/thread_pool/original_thread_pool.cc \
src/core/lib/event_engine/thread_pool/thread_count.cc \
src/core/lib/event_engine/thread_pool/thread_pool_factory.cc \
src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.cc \
src/core/lib/event_engine/thready_event_engine/thready_event_engine.cc \

1
config.w32 generated

@ -518,6 +518,7 @@ if (PHP_GRPC != "no") {
"src\\core\\lib\\event_engine\\tcp_socket_utils.cc " +
"src\\core\\lib\\event_engine\\thread_local.cc " +
"src\\core\\lib\\event_engine\\thread_pool\\original_thread_pool.cc " +
"src\\core\\lib\\event_engine\\thread_pool\\thread_count.cc " +
"src\\core\\lib\\event_engine\\thread_pool\\thread_pool_factory.cc " +
"src\\core\\lib\\event_engine\\thread_pool\\work_stealing_thread_pool.cc " +
"src\\core\\lib\\event_engine\\thready_event_engine\\thready_event_engine.cc " +

2
gRPC-C++.podspec generated

@ -795,6 +795,7 @@ Pod::Spec.new do |s|
'src/core/lib/event_engine/tcp_socket_utils.h',
'src/core/lib/event_engine/thread_local.h',
'src/core/lib/event_engine/thread_pool/original_thread_pool.h',
'src/core/lib/event_engine/thread_pool/thread_count.h',
'src/core/lib/event_engine/thread_pool/thread_pool.h',
'src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.h',
'src/core/lib/event_engine/thready_event_engine/thready_event_engine.h',
@ -1850,6 +1851,7 @@ Pod::Spec.new do |s|
'src/core/lib/event_engine/tcp_socket_utils.h',
'src/core/lib/event_engine/thread_local.h',
'src/core/lib/event_engine/thread_pool/original_thread_pool.h',
'src/core/lib/event_engine/thread_pool/thread_count.h',
'src/core/lib/event_engine/thread_pool/thread_pool.h',
'src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.h',
'src/core/lib/event_engine/thready_event_engine/thready_event_engine.h',

3
gRPC-Core.podspec generated

@ -1238,6 +1238,8 @@ Pod::Spec.new do |s|
'src/core/lib/event_engine/thread_local.h',
'src/core/lib/event_engine/thread_pool/original_thread_pool.cc',
'src/core/lib/event_engine/thread_pool/original_thread_pool.h',
'src/core/lib/event_engine/thread_pool/thread_count.cc',
'src/core/lib/event_engine/thread_pool/thread_count.h',
'src/core/lib/event_engine/thread_pool/thread_pool.h',
'src/core/lib/event_engine/thread_pool/thread_pool_factory.cc',
'src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.cc',
@ -2596,6 +2598,7 @@ Pod::Spec.new do |s|
'src/core/lib/event_engine/tcp_socket_utils.h',
'src/core/lib/event_engine/thread_local.h',
'src/core/lib/event_engine/thread_pool/original_thread_pool.h',
'src/core/lib/event_engine/thread_pool/thread_count.h',
'src/core/lib/event_engine/thread_pool/thread_pool.h',
'src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.h',
'src/core/lib/event_engine/thready_event_engine/thready_event_engine.h',

2
grpc.gemspec generated

@ -1143,6 +1143,8 @@ Gem::Specification.new do |s|
s.files += %w( src/core/lib/event_engine/thread_local.h )
s.files += %w( src/core/lib/event_engine/thread_pool/original_thread_pool.cc )
s.files += %w( src/core/lib/event_engine/thread_pool/original_thread_pool.h )
s.files += %w( src/core/lib/event_engine/thread_pool/thread_count.cc )
s.files += %w( src/core/lib/event_engine/thread_pool/thread_count.h )
s.files += %w( src/core/lib/event_engine/thread_pool/thread_pool.h )
s.files += %w( src/core/lib/event_engine/thread_pool/thread_pool_factory.cc )
s.files += %w( src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.cc )

3
grpc.gyp generated

@ -778,6 +778,7 @@
'src/core/lib/event_engine/slice_buffer.cc',
'src/core/lib/event_engine/tcp_socket_utils.cc',
'src/core/lib/event_engine/thread_pool/original_thread_pool.cc',
'src/core/lib/event_engine/thread_pool/thread_count.cc',
'src/core/lib/event_engine/thread_pool/thread_pool_factory.cc',
'src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.cc',
'src/core/lib/event_engine/thready_event_engine/thready_event_engine.cc',
@ -1283,6 +1284,7 @@
'src/core/lib/event_engine/slice_buffer.cc',
'src/core/lib/event_engine/tcp_socket_utils.cc',
'src/core/lib/event_engine/thread_pool/original_thread_pool.cc',
'src/core/lib/event_engine/thread_pool/thread_count.cc',
'src/core/lib/event_engine/thread_pool/thread_pool_factory.cc',
'src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.cc',
'src/core/lib/event_engine/thready_event_engine/thready_event_engine.cc',
@ -2008,6 +2010,7 @@
'src/core/lib/event_engine/slice_buffer.cc',
'src/core/lib/event_engine/tcp_socket_utils.cc',
'src/core/lib/event_engine/thread_pool/original_thread_pool.cc',
'src/core/lib/event_engine/thread_pool/thread_count.cc',
'src/core/lib/event_engine/thread_pool/thread_pool_factory.cc',
'src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.cc',
'src/core/lib/event_engine/thready_event_engine/thready_event_engine.cc',

2
package.xml generated

@ -1125,6 +1125,8 @@
<file baseinstalldir="/" name="src/core/lib/event_engine/thread_local.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/event_engine/thread_pool/original_thread_pool.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/event_engine/thread_pool/original_thread_pool.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/event_engine/thread_pool/thread_count.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/event_engine/thread_pool/thread_count.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/event_engine/thread_pool/thread_pool.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/event_engine/thread_pool/thread_pool_factory.cc" role="src" />
<file baseinstalldir="/" name="src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.cc" role="src" />

@ -1508,6 +1508,23 @@ grpc_cc_library(
deps = ["//:gpr_platform"],
)
grpc_cc_library(
name = "event_engine_thread_count",
srcs = [
"lib/event_engine/thread_pool/thread_count.cc",
],
hdrs = ["lib/event_engine/thread_pool/thread_count.h"],
external_deps = [
"absl/base:core_headers",
"absl/time",
],
deps = [
"time",
"useful",
"//:gpr",
],
)
grpc_cc_library(
name = "event_engine_thread_pool",
srcs = [
@ -1529,6 +1546,7 @@ grpc_cc_library(
deps = [
"common_event_engine_closures",
"event_engine_basic_work_queue",
"event_engine_thread_count",
"event_engine_thread_local",
"event_engine_trace",
"event_engine_work_queue",

@ -0,0 +1,58 @@
// Copyright 2023 The gRPC Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <grpc/support/port_platform.h>
#include "src/core/lib/event_engine/thread_pool/thread_count.h"
#include <inttypes.h>
#include "absl/time/clock.h"
#include "absl/time/time.h"
#include <grpc/support/log.h>
namespace grpc_event_engine {
namespace experimental {
// -------- LivingThreadCount --------
void LivingThreadCount::BlockUntilThreadCount(size_t desired_threads,
const char* why) {
constexpr grpc_core::Duration log_rate = grpc_core::Duration::Seconds(3);
while (true) {
auto curr_threads = WaitForCountChange(desired_threads, log_rate);
if (curr_threads == desired_threads) break;
GRPC_LOG_EVERY_N_SEC_DELAYED(
log_rate.seconds(), GPR_DEBUG,
"Waiting for thread pool to idle before %s. (%" PRIdPTR " to %" PRIdPTR
")",
why, curr_threads, desired_threads);
}
}
size_t LivingThreadCount::WaitForCountChange(size_t desired_threads,
grpc_core::Duration timeout) {
size_t count;
auto deadline = absl::Now() + absl::Milliseconds(timeout.millis());
do {
grpc_core::MutexLock lock(&mu_);
count = CountLocked();
if (count == desired_threads) break;
cv_.WaitWithDeadline(&mu_, deadline);
} while (absl::Now() < deadline);
return count;
}
} // namespace experimental
} // namespace grpc_event_engine

@ -0,0 +1,161 @@
// Copyright 2023 The gRPC Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef GRPC_SRC_CORE_LIB_EVENT_ENGINE_THREAD_POOL_THREAD_COUNT_H
#define GRPC_SRC_CORE_LIB_EVENT_ENGINE_THREAD_POOL_THREAD_COUNT_H
#include <grpc/support/port_platform.h>
#include <atomic>
#include <cstddef>
#include <numeric>
#include <utility>
#include <vector>
#include "absl/base/thread_annotations.h"
#include <grpc/support/cpu.h>
#include "src/core/lib/gpr/useful.h"
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/gprpp/time.h"
namespace grpc_event_engine {
namespace experimental {
// Tracks counts across some fixed number of shards.
// It is intended for fast increment/decrement operations, but a slower overall
// count operation.
class BusyThreadCount {
public:
// Increments a per-shard counter on construction, decrements on destruction.
class AutoThreadCounter {
public:
AutoThreadCounter(BusyThreadCount* counter, size_t idx)
: counter_(counter), idx_(idx) {
counter_->Increment(idx_);
}
~AutoThreadCounter() {
if (counter_ != nullptr) counter_->Decrement(idx_);
}
// not copyable
AutoThreadCounter(const AutoThreadCounter&) = delete;
AutoThreadCounter& operator=(const AutoThreadCounter&) = delete;
// moveable
AutoThreadCounter(AutoThreadCounter&& other) noexcept {
counter_ = std::exchange(other.counter_, nullptr);
idx_ = other.idx_;
}
AutoThreadCounter& operator=(AutoThreadCounter&& other) noexcept {
counter_ = std::exchange(other.counter_, nullptr);
idx_ = other.idx_;
return *this;
}
private:
BusyThreadCount* counter_;
size_t idx_;
};
BusyThreadCount() : shards_(grpc_core::Clamp(gpr_cpu_num_cores(), 2u, 64u)) {}
AutoThreadCounter MakeAutoThreadCounter(size_t idx) {
return AutoThreadCounter(this, idx);
};
void Increment(size_t idx) {
shards_[idx].busy_count.fetch_add(1, std::memory_order_relaxed);
}
void Decrement(size_t idx) {
shards_[idx].busy_count.fetch_sub(1, std::memory_order_relaxed);
}
size_t count() {
return std::accumulate(
shards_.begin(), shards_.end(), 0, [](size_t running, ShardedData& d) {
return running + d.busy_count.load(std::memory_order_relaxed);
});
}
// Returns some valid index into the per-shard data, which is rotated on every
// call to distribute load and reduce contention.
size_t NextIndex() { return next_idx_.fetch_add(1) % shards_.size(); }
private:
struct ShardedData {
std::atomic<size_t> busy_count{0};
} GPR_ALIGN_STRUCT(GPR_CACHELINE_SIZE);
std::vector<ShardedData> shards_;
std::atomic<size_t> next_idx_{0};
};
// Tracks the number of living threads. It is intended for a fast count
// operation, with relatively slower increment/decrement operations.
class LivingThreadCount {
public:
// Increments the global counter on construction, decrements on destruction.
class AutoThreadCounter {
public:
explicit AutoThreadCounter(LivingThreadCount* counter) : counter_(counter) {
counter_->Increment();
}
~AutoThreadCounter() {
if (counter_ != nullptr) counter_->Decrement();
}
// not copyable
AutoThreadCounter(const AutoThreadCounter&) = delete;
AutoThreadCounter& operator=(const AutoThreadCounter&) = delete;
// moveable
AutoThreadCounter(AutoThreadCounter&& other) noexcept {
counter_ = std::exchange(other.counter_, nullptr);
}
AutoThreadCounter& operator=(AutoThreadCounter&& other) noexcept {
counter_ = std::exchange(other.counter_, nullptr);
return *this;
}
private:
LivingThreadCount* counter_;
};
AutoThreadCounter MakeAutoThreadCounter() { return AutoThreadCounter(this); };
void Increment() ABSL_LOCKS_EXCLUDED(mu_) {
grpc_core::MutexLock lock(&mu_);
++living_count_;
cv_.SignalAll();
}
void Decrement() ABSL_LOCKS_EXCLUDED(mu_) {
grpc_core::MutexLock lock(&mu_);
--living_count_;
cv_.SignalAll();
}
void BlockUntilThreadCount(size_t desired_threads, const char* why)
ABSL_LOCKS_EXCLUDED(mu_);
size_t count() ABSL_LOCKS_EXCLUDED(mu_) {
grpc_core::MutexLock lock(&mu_);
return CountLocked();
}
private:
size_t CountLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) {
return living_count_;
}
size_t WaitForCountChange(size_t desired_threads,
grpc_core::Duration timeout);
grpc_core::Mutex mu_;
grpc_core::CondVar cv_ ABSL_GUARDED_BY(mu_);
size_t living_count_ ABSL_GUARDED_BY(mu_) = 0;
};
} // namespace experimental
} // namespace grpc_event_engine
#endif // GRPC_SRC_CORE_LIB_EVENT_ENGINE_THREAD_POOL_THREAD_COUNT_H

@ -209,9 +209,9 @@ void WorkStealingThreadPool::WorkStealingThreadPoolImpl::Quiesce() {
// until all other threads have exited, so we need to wait for just one thread
// running instead of zero.
bool is_threadpool_thread = g_local_queue != nullptr;
thread_count()->BlockUntilThreadCount(CounterType::kLivingThreadCount,
is_threadpool_thread ? 1 : 0,
"shutting down", work_signal());
work_signal()->SignalAll();
living_thread_count_.BlockUntilThreadCount(is_threadpool_thread ? 1 : 0,
"shutting down");
GPR_ASSERT(queue_.Empty());
quiesced_.store(true, std::memory_order_relaxed);
lifeguard_.BlockUntilShutdownAndReset();
@ -249,8 +249,8 @@ bool WorkStealingThreadPool::WorkStealingThreadPoolImpl::IsQuiesced() {
void WorkStealingThreadPool::WorkStealingThreadPoolImpl::PrepareFork() {
SetForking(true);
thread_count()->BlockUntilThreadCount(CounterType::kLivingThreadCount, 0,
"forking", &work_signal_);
work_signal_.SignalAll();
living_thread_count_.BlockUntilThreadCount(0, "forking");
lifeguard_.BlockUntilShutdownAndReset();
}
@ -329,12 +329,9 @@ void WorkStealingThreadPool::WorkStealingThreadPoolImpl::Lifeguard::
// No new threads are started when forking.
// No new work is done when forking needs to begin.
if (pool_->forking_.load()) return;
int busy_thread_count =
pool_->thread_count_.GetCount(CounterType::kBusyCount);
int living_thread_count =
pool_->thread_count_.GetCount(CounterType::kLivingThreadCount);
const auto living_thread_count = pool_->living_thread_count()->count();
// Wake an idle worker thread if there's global work to be had.
if (busy_thread_count < living_thread_count) {
if (pool_->busy_thread_count()->count() < living_thread_count) {
if (!pool_->queue_.Empty()) {
pool_->work_signal()->Signal();
backoff_.Reset();
@ -357,7 +354,8 @@ void WorkStealingThreadPool::WorkStealingThreadPoolImpl::Lifeguard::
// queue, nor any work to steal. Add more sophisticated logic about when to
// start a thread.
GRPC_EVENT_ENGINE_TRACE(
"Starting new ThreadPool thread due to backlog (total threads: %d)",
"Starting new ThreadPool thread due to backlog (total threads: %" PRIuPTR
")",
living_thread_count + 1);
pool_->StartThread();
// Tell the lifeguard to monitor the pool more closely.
@ -369,12 +367,13 @@ void WorkStealingThreadPool::WorkStealingThreadPoolImpl::Lifeguard::
WorkStealingThreadPool::ThreadState::ThreadState(
std::shared_ptr<WorkStealingThreadPoolImpl> pool)
: pool_(std::move(pool)),
auto_thread_count_(pool_->thread_count(),
CounterType::kLivingThreadCount),
auto_thread_counter_(
pool_->living_thread_count()->MakeAutoThreadCounter()),
backoff_(grpc_core::BackOff::Options()
.set_initial_backoff(kWorkerThreadMinSleepBetweenChecks)
.set_max_backoff(kWorkerThreadMaxSleepBetweenChecks)
.set_multiplier(1.3)) {}
.set_multiplier(1.3)),
busy_count_idx_(pool_->busy_thread_count()->NextIndex()) {}
void WorkStealingThreadPool::ThreadState::ThreadBody() {
g_local_queue = new BasicWorkQueue();
@ -412,8 +411,8 @@ bool WorkStealingThreadPool::ThreadState::Step() {
auto* closure = g_local_queue->PopMostRecent();
// If local work is available, run it.
if (closure != nullptr) {
ThreadCount::AutoThreadCount auto_busy{pool_->thread_count(),
CounterType::kBusyCount};
auto busy =
pool_->busy_thread_count()->MakeAutoThreadCounter(busy_count_idx_);
closure->Run();
return true;
}
@ -450,8 +449,7 @@ bool WorkStealingThreadPool::ThreadState::Step() {
// Quit a thread if the pool has more than it requires, and this thread
// has been idle long enough.
if (timed_out &&
pool_->thread_count()->GetCount(CounterType::kLivingThreadCount) >
pool_->reserve_threads() &&
pool_->living_thread_count()->count() > pool_->reserve_threads() &&
grpc_core::Timestamp::Now() - start_time > kIdleThreadLimit) {
return false;
}
@ -462,8 +460,8 @@ bool WorkStealingThreadPool::ThreadState::Step() {
return false;
}
if (closure != nullptr) {
ThreadCount::AutoThreadCount auto_busy{pool_->thread_count(),
CounterType::kBusyCount};
auto busy =
pool_->busy_thread_count()->MakeAutoThreadCounter(busy_count_idx_);
closure->Run();
}
backoff_.Reset();
@ -472,8 +470,8 @@ bool WorkStealingThreadPool::ThreadState::Step() {
void WorkStealingThreadPool::ThreadState::FinishDraining() {
// The thread is definitionally busy while draining
ThreadCount::AutoThreadCount auto_busy{pool_->thread_count(),
CounterType::kBusyCount};
auto busy =
pool_->busy_thread_count()->MakeAutoThreadCounter(busy_count_idx_);
// If a fork occurs at any point during shutdown, quit draining. The post-fork
// threads will finish draining the global queue.
while (!pool_->IsForking()) {
@ -495,72 +493,6 @@ void WorkStealingThreadPool::ThreadState::FinishDraining() {
}
}
// -------- WorkStealingThreadPool::ThreadCount --------
void WorkStealingThreadPool::ThreadCount::Add(CounterType counter_type) {
grpc_core::MutexLock lock(&wait_mu_[counter_type]);
++thread_counts_[counter_type];
wait_cv_[counter_type].SignalAll();
}
void WorkStealingThreadPool::ThreadCount::Remove(CounterType counter_type) {
grpc_core::MutexLock lock(&wait_mu_[counter_type]);
--thread_counts_[counter_type];
wait_cv_[counter_type].SignalAll();
}
void WorkStealingThreadPool::ThreadCount::BlockUntilThreadCount(
CounterType counter_type, size_t desired_threads, const char* why,
WorkSignal* work_signal) {
// Wait for all threads to exit.
work_signal->SignalAll();
while (true) {
auto curr_threads = WaitForCountChange(
counter_type, desired_threads,
grpc_core::Duration::Seconds(kBlockingQuiesceLogRateSeconds));
if (curr_threads == desired_threads) break;
GRPC_LOG_EVERY_N_SEC_DELAYED(
kBlockingQuiesceLogRateSeconds, GPR_DEBUG,
"Waiting for thread pool to idle before %s. (%" PRIdPTR " to %" PRIdPTR
")",
why, curr_threads, desired_threads);
}
}
size_t WorkStealingThreadPool::ThreadCount::WaitForCountChange(
CounterType counter_type, size_t desired_threads,
grpc_core::Duration timeout) {
size_t count;
auto deadline = absl::Now() + absl::Milliseconds(timeout.millis());
do {
grpc_core::MutexLock lock(&wait_mu_[counter_type]);
count = GetCountLocked(counter_type);
if (count == desired_threads) break;
wait_cv_[counter_type].WaitWithDeadline(&wait_mu_[counter_type], deadline);
} while (absl::Now() < deadline);
return count;
}
size_t WorkStealingThreadPool::ThreadCount::GetCount(CounterType counter_type) {
grpc_core::MutexLock lock(&wait_mu_[counter_type]);
return GetCountLocked(counter_type);
}
size_t WorkStealingThreadPool::ThreadCount::GetCountLocked(
CounterType counter_type) {
return thread_counts_[counter_type];
}
WorkStealingThreadPool::ThreadCount::AutoThreadCount::AutoThreadCount(
ThreadCount* counter, CounterType counter_type)
: counter_(counter), counter_type_(counter_type) {
counter_->Add(counter_type_);
}
WorkStealingThreadPool::ThreadCount::AutoThreadCount::~AutoThreadCount() {
counter_->Remove(counter_type_);
}
// -------- WorkStealingThreadPool::WorkSignal --------
void WorkStealingThreadPool::WorkSignal::Signal() {

@ -33,6 +33,7 @@
#include <grpc/event_engine/event_engine.h>
#include "src/core/lib/backoff/backoff.h"
#include "src/core/lib/event_engine/thread_pool/thread_count.h"
#include "src/core/lib/event_engine/thread_pool/thread_pool.h"
#include "src/core/lib/event_engine/work_queue/basic_work_queue.h"
#include "src/core/lib/event_engine/work_queue/work_queue.h"
@ -76,56 +77,6 @@ class WorkStealingThreadPool final : public ThreadPool {
grpc_core::CondVar cv_ ABSL_GUARDED_BY(mu_);
};
// Types of thread counts.
// Note this is intentionally not an enum class, the keys are used as indexes
// into the ThreadCount's private array.
enum CounterType {
kLivingThreadCount = 0,
kBusyCount,
};
class ThreadCount {
public:
// Adds 1 to the thread count for that counter type.
void Add(CounterType counter_type)
ABSL_LOCKS_EXCLUDED(wait_mu_[counter_type]);
// Subtracts 1 from the thread count for that counter type.
void Remove(CounterType counter_type)
ABSL_LOCKS_EXCLUDED(wait_mu_[counter_type]);
// Blocks until the thread count for that type reaches `desired_threads`.
void BlockUntilThreadCount(CounterType counter_type, size_t desired_threads,
const char* why, WorkSignal* work_signal)
ABSL_LOCKS_EXCLUDED(wait_mu_[counter_type]);
// Returns the current thread count for the tracked type.
size_t GetCount(CounterType counter_type)
ABSL_LOCKS_EXCLUDED(wait_mu_[counter_type]);
// Returns the current thread count for the tracked type.
size_t GetCountLocked(CounterType counter_type)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(wait_mu_[counter_type]);
// Adds and removes thread counts on construction and destruction
class AutoThreadCount {
public:
AutoThreadCount(ThreadCount* counter, CounterType counter_type);
~AutoThreadCount();
private:
ThreadCount* counter_;
CounterType counter_type_;
};
private:
// Wait for the desired count to be reached.
// Returns the current thread count either when the desired count is
// reached, or when the deadline has passed, whichever happens first.
size_t WaitForCountChange(CounterType counter_type, size_t desired_threads,
grpc_core::Duration timeout);
grpc_core::Mutex wait_mu_[2];
grpc_core::CondVar wait_cv_[2];
size_t thread_counts_[2]{0, 0};
};
// A pool of WorkQueues that participate in work stealing.
//
// Every worker thread registers and unregisters its thread-local thread pool
@ -186,7 +137,8 @@ class WorkStealingThreadPool final : public ThreadPool {
bool IsForking();
bool IsQuiesced();
size_t reserve_threads() { return reserve_threads_; }
ThreadCount* thread_count() { return &thread_count_; }
BusyThreadCount* busy_thread_count() { return &busy_thread_count_; }
LivingThreadCount* living_thread_count() { return &living_thread_count_; }
TheftRegistry* theft_registry() { return &theft_registry_; }
WorkQueue* queue() { return &queue_; }
WorkSignal* work_signal() { return &work_signal_; }
@ -221,7 +173,8 @@ class WorkStealingThreadPool final : public ThreadPool {
};
const size_t reserve_threads_;
ThreadCount thread_count_;
BusyThreadCount busy_thread_count_;
LivingThreadCount living_thread_count_;
TheftRegistry theft_registry_;
BasicWorkQueue queue_;
// Track shutdown and fork bits separately.
@ -254,11 +207,11 @@ class WorkStealingThreadPool final : public ThreadPool {
// is decremented at time of destruction. This is necessary when this thread
// state holds the last shared_ptr keeping the pool alive.
std::shared_ptr<WorkStealingThreadPoolImpl> pool_;
// auto_thread_count_ must be the second member declared, so that the thread
// count is decremented after all other state is cleaned up (preventing
// leaks).
ThreadCount::AutoThreadCount auto_thread_count_;
// auto_thread_counter_ must be declared after pool_, so that the thread
// count is decremented after all other pool state is cleaned up.
LivingThreadCount::AutoThreadCounter auto_thread_counter_;
grpc_core::BackOff backoff_;
size_t busy_count_idx_;
};
const std::shared_ptr<WorkStealingThreadPoolImpl> pool_;

@ -527,6 +527,7 @@ CORE_SOURCE_FILES = [
'src/core/lib/event_engine/tcp_socket_utils.cc',
'src/core/lib/event_engine/thread_local.cc',
'src/core/lib/event_engine/thread_pool/original_thread_pool.cc',
'src/core/lib/event_engine/thread_pool/thread_count.cc',
'src/core/lib/event_engine/thread_pool/thread_pool_factory.cc',
'src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.cc',
'src/core/lib/event_engine/thready_event_engine/thready_event_engine.cc',

@ -60,6 +60,7 @@ grpc_cc_test(
deps = [
"//:gpr",
"//:grpc",
"//src/core:event_engine_thread_count",
"//src/core:event_engine_thread_pool",
"//src/core:notification",
"//test/core/util:grpc_test_util_unsecure",

@ -14,11 +14,13 @@
#include "src/core/lib/event_engine/thread_pool/thread_pool.h"
#include <algorithm>
#include <atomic>
#include <chrono>
#include <cmath>
#include <functional>
#include <thread>
#include <vector>
#include "absl/time/clock.h"
#include "absl/time/time.h"
@ -27,6 +29,7 @@
#include <grpc/grpc.h>
#include "src/core/lib/event_engine/thread_pool/original_thread_pool.h"
#include "src/core/lib/event_engine/thread_pool/thread_count.h"
#include "src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.h"
#include "src/core/lib/gprpp/notification.h"
#include "src/core/lib/gprpp/thd.h"
@ -112,7 +115,7 @@ TYPED_TEST(ThreadPoolTest, ForkStressTest) {
}
runcount.fetch_add(1, std::memory_order_relaxed);
};
for (int i = 0; i < expected_runcount; i++) {
for (auto i = 0; i < expected_runcount; i++) {
pool.Run(inner_fn);
}
// simulate multiple forks at a fixed frequency
@ -141,7 +144,7 @@ TYPED_TEST(ThreadPoolTest, StartQuiesceRaceStressTest) {
std::unique_ptr<TypeParam> pool;
int i;
};
for (int i = 0; i < iter_count; i++) {
for (auto i = 0; i < iter_count; i++) {
ThdState state{std::make_unique<TypeParam>(8), i};
state.pool->PrepareFork();
grpc_core::Thread t1(
@ -209,7 +212,7 @@ TEST_F(WorkStealingThreadPoolTest, ScalesWhenBackloggedFromGlobalQueue) {
// Ensures the pool is saturated before signaling closures to continue.
std::atomic<int> waiters{0};
std::atomic<bool> signaled{false};
for (int i = 0; i < pool_thread_count; i++) {
for (auto i = 0; i < pool_thread_count; i++) {
p.Run([&]() {
waiters.fetch_add(1);
while (!signaled.load()) {
@ -232,7 +235,7 @@ TEST_F(WorkStealingThreadPoolTest, ScalesWhenBackloggedFromGlobalQueue) {
// is fixed, or the implementation is deleted, make this a typed test again.
TEST_F(WorkStealingThreadPoolTest,
ScalesWhenBackloggedFromSingleThreadLocalQueue) {
int pool_thread_count = 8;
constexpr int pool_thread_count = 8;
WorkStealingThreadPool p(pool_thread_count);
grpc_core::Notification signal;
// Ensures the pool is saturated before signaling closures to continue.
@ -262,18 +265,193 @@ TEST_F(WorkStealingThreadPoolTest,
// pool, it takes around 50s to run. When that is fixed, or the implementation
// is deleted, make this a typed test again.
TEST_F(WorkStealingThreadPoolTest, QuiesceRaceStressTest) {
int cycle_count = 333;
int thread_count = 8;
int run_count = thread_count * 2;
for (int i = 0; i < cycle_count; i++) {
constexpr int cycle_count = 333;
constexpr int thread_count = 8;
constexpr int run_count = thread_count * 2;
for (auto i = 0; i < cycle_count; i++) {
WorkStealingThreadPool p(thread_count);
for (int j = 0; j < run_count; j++) {
for (auto j = 0; j < run_count; j++) {
p.Run([]() {});
}
p.Quiesce();
}
}
class BusyThreadCountTest : public testing::Test {};
TEST_F(BusyThreadCountTest, StressTest) {
// Spawns a large number of threads to concurrently increments/decrement the
// counters, and request count totals. Magic numbers were tuned for tests to
// run in a reasonable amount of time.
constexpr size_t thread_count = 300;
constexpr int run_count = 1000;
constexpr int increment_by = 50;
BusyThreadCount busy_thread_count;
grpc_core::Notification stop_counting;
std::thread counter_thread([&]() {
while (!stop_counting.HasBeenNotified()) {
busy_thread_count.count();
}
});
std::vector<std::thread> threads;
threads.reserve(thread_count);
for (size_t i = 0; i < thread_count; i++) {
threads.emplace_back([&]() {
for (int j = 0; j < run_count; j++) {
// Get a new index for every iteration.
// This is not the intended use, but further stress tests the NextIndex
// function.
auto thread_idx = busy_thread_count.NextIndex();
for (int inc = 0; inc < increment_by; inc++) {
busy_thread_count.Increment(thread_idx);
}
for (int inc = 0; inc < increment_by; inc++) {
busy_thread_count.Decrement(thread_idx);
}
}
});
}
for (auto& thd : threads) thd.join();
stop_counting.Notify();
counter_thread.join();
ASSERT_EQ(busy_thread_count.count(), 0);
}
TEST_F(BusyThreadCountTest, AutoCountStressTest) {
// Spawns a large number of threads to concurrently increments/decrement the
// counters, and request count totals. Magic numbers were tuned for tests to
// run in a reasonable amount of time.
constexpr size_t thread_count = 150;
constexpr int run_count = 1000;
constexpr int increment_by = 30;
BusyThreadCount busy_thread_count;
grpc_core::Notification stop_counting;
std::thread counter_thread([&]() {
while (!stop_counting.HasBeenNotified()) {
busy_thread_count.count();
}
});
std::vector<std::thread> threads;
threads.reserve(thread_count);
for (size_t i = 0; i < thread_count; i++) {
threads.emplace_back([&]() {
for (int j = 0; j < run_count; j++) {
std::vector<BusyThreadCount::AutoThreadCounter> auto_counters;
auto_counters.reserve(increment_by);
for (int ctr_count = 0; ctr_count < increment_by; ctr_count++) {
auto_counters.push_back(busy_thread_count.MakeAutoThreadCounter(
busy_thread_count.NextIndex()));
}
}
});
}
for (auto& thd : threads) thd.join();
stop_counting.Notify();
counter_thread.join();
ASSERT_EQ(busy_thread_count.count(), 0);
}
class LivingThreadCountTest : public testing::Test {};
TEST_F(LivingThreadCountTest, StressTest) {
// Spawns a large number of threads to concurrently increments/decrement the
// counters, and request count totals. Magic numbers were tuned for tests to
// run in a reasonable amount of time.
constexpr size_t thread_count = 50;
constexpr int run_count = 1000;
constexpr int increment_by = 10;
LivingThreadCount living_thread_count;
grpc_core::Notification stop_counting;
std::thread counter_thread([&]() {
while (!stop_counting.HasBeenNotified()) {
living_thread_count.count();
}
});
std::vector<std::thread> threads;
threads.reserve(thread_count);
for (size_t i = 0; i < thread_count; i++) {
threads.emplace_back([&]() {
for (int j = 0; j < run_count; j++) {
// Get a new index for every iteration.
// This is not the intended use, but further stress tests the NextIndex
// function.
for (int inc = 0; inc < increment_by; inc++) {
living_thread_count.Increment();
}
for (int inc = 0; inc < increment_by; inc++) {
living_thread_count.Decrement();
}
}
});
}
for (auto& thd : threads) thd.join();
stop_counting.Notify();
counter_thread.join();
ASSERT_EQ(living_thread_count.count(), 0);
}
TEST_F(LivingThreadCountTest, AutoCountStressTest) {
// Spawns a large number of threads to concurrently increments/decrement the
// counters, and request count totals. Magic numbers were tuned for tests to
// run in a reasonable amount of time.
constexpr size_t thread_count = 50;
constexpr int run_count = 1000;
constexpr int increment_by = 10;
LivingThreadCount living_thread_count;
grpc_core::Notification stop_counting;
std::thread counter_thread([&]() {
while (!stop_counting.HasBeenNotified()) {
living_thread_count.count();
}
});
std::vector<std::thread> threads;
threads.reserve(thread_count);
for (size_t i = 0; i < thread_count; i++) {
threads.emplace_back([&]() {
for (int j = 0; j < run_count; j++) {
std::vector<LivingThreadCount::AutoThreadCounter> auto_counters;
auto_counters.reserve(increment_by);
for (int ctr_count = 0; ctr_count < increment_by; ctr_count++) {
auto_counters.push_back(living_thread_count.MakeAutoThreadCounter());
}
}
});
}
for (auto& thd : threads) thd.join();
stop_counting.Notify();
counter_thread.join();
ASSERT_EQ(living_thread_count.count(), 0);
}
TEST_F(LivingThreadCountTest, BlockUntilThreadCountTest) {
constexpr size_t thread_count = 100;
grpc_core::Notification waiting;
LivingThreadCount living_thread_count;
std::vector<std::thread> threads;
threads.reserve(thread_count);
// Start N living threads
for (size_t i = 0; i < thread_count; i++) {
threads.emplace_back([&]() {
auto alive = living_thread_count.MakeAutoThreadCounter();
waiting.WaitForNotification();
});
}
// Join in a separate thread
std::thread joiner([&]() {
waiting.Notify();
for (auto& thd : threads) thd.join();
});
{
auto alive = living_thread_count.MakeAutoThreadCounter();
living_thread_count.BlockUntilThreadCount(1,
"block until 1 thread remains");
}
living_thread_count.BlockUntilThreadCount(0,
"block until all threads are gone");
joiner.join();
ASSERT_EQ(living_thread_count.count(), 0);
}
} // namespace experimental
} // namespace grpc_event_engine

@ -2140,6 +2140,8 @@ src/core/lib/event_engine/thread_local.cc \
src/core/lib/event_engine/thread_local.h \
src/core/lib/event_engine/thread_pool/original_thread_pool.cc \
src/core/lib/event_engine/thread_pool/original_thread_pool.h \
src/core/lib/event_engine/thread_pool/thread_count.cc \
src/core/lib/event_engine/thread_pool/thread_count.h \
src/core/lib/event_engine/thread_pool/thread_pool.h \
src/core/lib/event_engine/thread_pool/thread_pool_factory.cc \
src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.cc \

@ -1918,6 +1918,8 @@ src/core/lib/event_engine/thread_local.cc \
src/core/lib/event_engine/thread_local.h \
src/core/lib/event_engine/thread_pool/original_thread_pool.cc \
src/core/lib/event_engine/thread_pool/original_thread_pool.h \
src/core/lib/event_engine/thread_pool/thread_count.cc \
src/core/lib/event_engine/thread_pool/thread_count.h \
src/core/lib/event_engine/thread_pool/thread_pool.h \
src/core/lib/event_engine/thread_pool/thread_pool_factory.cc \
src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.cc \

Loading…
Cancel
Save