[EventEngine] Dump all thread stacks when the thread pool is wedged

This should help us debug rare flakes where the thread pool waits indefinitely

PiperOrigin-RevId: 601546451
pull/35656/head
AJ Heller 10 months ago committed by Copybara-Service
parent 7b0eb3be03
commit ba2f74a84c
  1. 11
      bazel/grpc_build_system.bzl
  2. 6
      src/core/BUILD
  3. 35
      src/core/lib/event_engine/thread_pool/thread_count.cc
  4. 7
      src/core/lib/event_engine/thread_pool/thread_count.h
  5. 114
      src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.cc
  6. 9
      src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.h
  7. 29
      src/core/lib/gprpp/posix/thd.cc
  8. 8
      src/core/lib/gprpp/thd.h
  9. 5
      src/core/lib/gprpp/time.h
  10. 11
      src/core/lib/gprpp/windows/thd.cc
  11. 21
      test/core/event_engine/thread_pool_test.cc

@ -299,6 +299,9 @@ def expand_tests(name, srcs, deps, tags, args, exclude_pollers, uses_polling, us
"""
poller_config = []
# See work_stealing_thread_pool.cc for details.
default_env = {"GRPC_THREAD_POOL_VERBOSE_FAILURES": "true"}
if not uses_polling:
tags = tags + ["no_uses_polling"]
@ -309,7 +312,7 @@ def expand_tests(name, srcs, deps, tags, args, exclude_pollers, uses_polling, us
"tags": tags,
"args": args,
"flaky": flaky,
"env": {},
"env": default_env,
})
else:
# On linux we run the same test with the default EventEngine, once for each
@ -329,7 +332,7 @@ def expand_tests(name, srcs, deps, tags, args, exclude_pollers, uses_polling, us
"args": args,
"env": {
"GRPC_POLL_STRATEGY": poller,
},
} | default_env,
"flaky": flaky,
})
@ -346,7 +349,7 @@ def expand_tests(name, srcs, deps, tags, args, exclude_pollers, uses_polling, us
"deps": deps,
"tags": tags + ["no_linux"],
"args": args,
"env": {},
"env": default_env,
"flaky": flaky,
})
else:
@ -366,7 +369,7 @@ def expand_tests(name, srcs, deps, tags, args, exclude_pollers, uses_polling, us
"deps": deps,
"tags": test_tags,
"args": test_args,
"env": {},
"env": default_env,
"flaky": flaky,
})

@ -1684,6 +1684,8 @@ grpc_cc_library(
external_deps = [
"absl/base:core_headers",
"absl/time",
"absl/status",
"absl/strings:str_format",
],
deps = [
"time",
@ -1707,14 +1709,17 @@ grpc_cc_library(
"absl/container:flat_hash_set",
"absl/functional:any_invocable",
"absl/time",
"absl/types:optional",
],
deps = [
"common_event_engine_closures",
"env",
"event_engine_basic_work_queue",
"event_engine_thread_count",
"event_engine_thread_local",
"event_engine_trace",
"event_engine_work_queue",
"examine_stack",
"forkable",
"no_destruct",
"notification",
@ -1722,7 +1727,6 @@ grpc_cc_library(
"//:backoff",
"//:event_engine_base_hdrs",
"//:gpr",
"//:grpc_trace",
],
)

@ -17,27 +17,48 @@
#include <inttypes.h>
#include <cstddef>
#include "absl/status/status.h"
#include "absl/strings/str_format.h"
#include "absl/time/clock.h"
#include "absl/time/time.h"
#include <grpc/support/log.h>
#include "src/core/lib/gprpp/time.h"
namespace grpc_event_engine {
namespace experimental {
// -------- LivingThreadCount --------
void LivingThreadCount::BlockUntilThreadCount(size_t desired_threads,
const char* why) {
constexpr grpc_core::Duration log_rate = grpc_core::Duration::Seconds(3);
absl::Status LivingThreadCount::BlockUntilThreadCount(
size_t desired_threads, const char* why,
grpc_core::Duration stuck_timeout) {
grpc_core::Timestamp timeout_baseline = grpc_core::Timestamp::Now();
constexpr grpc_core::Duration log_rate = grpc_core::Duration::Seconds(5);
size_t prev_thread_count = 0;
while (true) {
auto curr_threads = WaitForCountChange(desired_threads, log_rate);
if (curr_threads == desired_threads) break;
auto curr_threads = WaitForCountChange(desired_threads, log_rate / 2);
if (curr_threads == desired_threads) return absl::OkStatus();
auto elapsed = grpc_core::Timestamp::Now() - timeout_baseline;
if (curr_threads == prev_thread_count) {
if (elapsed > stuck_timeout) {
return absl::DeadlineExceededError(absl::StrFormat(
"Timed out after %f seconds", stuck_timeout.seconds()));
}
} else {
// the thread count has changed. Reset the timeout clock
prev_thread_count = curr_threads;
timeout_baseline = grpc_core::Timestamp::Now();
}
GRPC_LOG_EVERY_N_SEC_DELAYED(
log_rate.seconds(), GPR_DEBUG,
"Waiting for thread pool to idle before %s. (%" PRIdPTR " to %" PRIdPTR
")",
why, curr_threads, desired_threads);
"). Timing out in %0.f seconds.",
why, curr_threads, desired_threads,
(stuck_timeout - elapsed).seconds());
}
}

@ -151,7 +151,12 @@ class LivingThreadCount {
--living_count_;
cv_.SignalAll();
}
void BlockUntilThreadCount(size_t desired_threads, const char* why)
// Blocks the calling thread until the desired number of threads is reached.
// If the thread count does not change for some given `stuck_timeout`
// duration, this method returns error. If the thread count does change, the
// timeout clock is reset.
absl::Status BlockUntilThreadCount(size_t desired_threads, const char* why,
grpc_core::Duration stuck_timeout)
ABSL_LOCKS_EXCLUDED(mu_);
size_t count() ABSL_LOCKS_EXCLUDED(mu_) {
grpc_core::MutexLock lock(&mu_);

@ -15,7 +15,6 @@
// limitations under the License.
//
//
#include <grpc/support/port_platform.h>
#include "src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.h"
@ -24,24 +23,36 @@
#include <atomic>
#include <chrono>
#include <cstddef>
#include <memory>
#include <utility>
#include "absl/functional/any_invocable.h"
#include "absl/time/clock.h"
#include "absl/time/time.h"
#include "absl/types/optional.h"
#include <grpc/support/log.h>
#include <grpc/support/thd_id.h>
#include "src/core/lib/backoff/backoff.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/event_engine/common_closures.h"
#include "src/core/lib/event_engine/thread_local.h"
#include "src/core/lib/event_engine/trace.h"
#include "src/core/lib/event_engine/work_queue/basic_work_queue.h"
#include "src/core/lib/event_engine/work_queue/work_queue.h"
#include "src/core/lib/gprpp/crash.h"
#include "src/core/lib/gprpp/env.h"
#include "src/core/lib/gprpp/examine_stack.h"
#include "src/core/lib/gprpp/thd.h"
#include "src/core/lib/gprpp/time.h"
#ifdef GPR_POSIX_SYNC
#include <csignal>
#elif defined(GPR_WINDOWS)
#include <signal.h>
#endif
// IWYU pragma: no_include <ratio>
// ## Thread Pool Fork-handling
@ -84,6 +95,13 @@
// non-existent Lifeguard thread to finish. Trying a simple
// `lifeguard_thread_.Join()` leads to memory access errors. This implementation
// uses Notifications to coordinate startup and shutdown states.
//
// ## Debugging
//
// Set the environment variable GRPC_THREAD_POOL_VERBOSE_FAILURES=anything to
// enable advanced debugging. When the pool takes too long to quiesce, a
// backtrace will be printed for every running thread, and the process will
// abort.
namespace grpc_event_engine {
namespace experimental {
@ -117,6 +135,37 @@ constexpr grpc_core::Duration kLifeguardMinSleepBetweenChecks{
// Maximum time the lifeguard thread should sleep between checking for new work.
constexpr grpc_core::Duration kLifeguardMaxSleepBetweenChecks{
grpc_core::Duration::Seconds(1)};
constexpr grpc_core::Duration kBlockUntilThreadCountTimeout{
grpc_core::Duration::Seconds(60)};
#ifdef GPR_POSIX_SYNC
const bool g_log_verbose_failures =
grpc_core::GetEnv("GRPC_THREAD_POOL_VERBOSE_FAILURES").has_value();
constexpr int kDumpStackSignal = SIGUSR1;
#elif defined(GPR_WINDOWS)
const bool g_log_verbose_failures =
grpc_core::GetEnv("GRPC_THREAD_POOL_VERBOSE_FAILURES").has_value();
constexpr int kDumpStackSignal = SIGTERM;
#else
constexpr bool g_log_verbose_failures = false;
constexpr int kDumpStackSignal = -1;
#endif
std::atomic<size_t> g_reported_dump_count{0};
void DumpSignalHandler(int /* sig */) {
const auto trace = grpc_core::GetCurrentStackTrace();
if (!trace.has_value()) {
gpr_log(GPR_ERROR, "DumpStack::%" PRIdPTR ": Stack trace not available",
gpr_thd_currentid());
} else {
gpr_log(GPR_ERROR, "DumpStack::%" PRIdPTR ": %s", gpr_thd_currentid(),
trace->c_str());
}
g_reported_dump_count.fetch_add(1);
grpc_core::Thread::Kill(gpr_thd_currentid());
}
} // namespace
thread_local WorkQueue* g_local_queue = nullptr;
@ -125,6 +174,10 @@ thread_local WorkQueue* g_local_queue = nullptr;
WorkStealingThreadPool::WorkStealingThreadPool(size_t reserve_threads)
: pool_{std::make_shared<WorkStealingThreadPoolImpl>(reserve_threads)} {
if (g_log_verbose_failures) {
GRPC_EVENT_ENGINE_TRACE(
"%s", "WorkStealingThreadPool verbose failures are enabled");
}
pool_->Start();
}
@ -221,8 +274,13 @@ void WorkStealingThreadPool::WorkStealingThreadPoolImpl::Quiesce() {
// running instead of zero.
bool is_threadpool_thread = g_local_queue != nullptr;
work_signal()->SignalAll();
living_thread_count_.BlockUntilThreadCount(is_threadpool_thread ? 1 : 0,
"shutting down");
auto threads_were_shut_down = living_thread_count_.BlockUntilThreadCount(
is_threadpool_thread ? 1 : 0, "shutting down",
g_log_verbose_failures ? kBlockUntilThreadCountTimeout
: grpc_core::Duration::Infinity());
if (!threads_were_shut_down.ok() && g_log_verbose_failures) {
DumpStacksAndCrash();
}
GPR_ASSERT(queue_.Empty());
quiesced_.store(true, std::memory_order_relaxed);
lifeguard_.BlockUntilShutdownAndReset();
@ -262,7 +320,11 @@ void WorkStealingThreadPool::WorkStealingThreadPoolImpl::PrepareFork() {
gpr_log(GPR_INFO, "WorkStealingThreadPoolImpl::PrepareFork");
SetForking(true);
work_signal_.SignalAll();
living_thread_count_.BlockUntilThreadCount(0, "forking");
auto threads_were_shut_down = living_thread_count_.BlockUntilThreadCount(
0, "forking", kBlockUntilThreadCountTimeout);
if (!threads_were_shut_down.ok() && g_log_verbose_failures) {
DumpStacksAndCrash();
}
lifeguard_.BlockUntilShutdownAndReset();
}
@ -271,6 +333,37 @@ void WorkStealingThreadPool::WorkStealingThreadPoolImpl::Postfork() {
Start();
}
void WorkStealingThreadPool::WorkStealingThreadPoolImpl::TrackThread(
gpr_thd_id tid) {
grpc_core::MutexLock lock(&thd_set_mu_);
thds_.insert(tid);
}
void WorkStealingThreadPool::WorkStealingThreadPoolImpl::UntrackThread(
gpr_thd_id tid) {
grpc_core::MutexLock lock(&thd_set_mu_);
thds_.erase(tid);
}
void WorkStealingThreadPool::WorkStealingThreadPoolImpl::DumpStacksAndCrash() {
grpc_core::MutexLock lock(&thd_set_mu_);
gpr_log(GPR_ERROR,
"Pool did not quiesce in time, gRPC will not shut down cleanly. "
"Dumping all %zu thread stacks.",
thds_.size());
for (const auto tid : thds_) {
grpc_core::Thread::Signal(tid, kDumpStackSignal);
}
// If this is a thread pool thread, wait for one fewer thread.
auto ignore_thread_count = g_local_queue != nullptr ? 1 : 0;
while (living_thread_count_.count() - ignore_thread_count >
g_reported_dump_count.load()) {
absl::SleepFor(absl::Milliseconds(200));
}
grpc_core::Crash(
"Pool did not quiesce in time, gRPC will not shut down cleanly.");
}
// -------- WorkStealingThreadPool::WorkStealingThreadPoolImpl::Lifeguard -----
WorkStealingThreadPool::WorkStealingThreadPoolImpl::Lifeguard::Lifeguard(
@ -388,6 +481,14 @@ WorkStealingThreadPool::ThreadState::ThreadState(
busy_count_idx_(pool_->busy_thread_count()->NextIndex()) {}
void WorkStealingThreadPool::ThreadState::ThreadBody() {
if (g_log_verbose_failures) {
#ifdef GPR_POSIX_SYNC
std::signal(kDumpStackSignal, DumpSignalHandler);
#elif defined(GPR_WINDOWS)
signal(kDumpStackSignal, DumpSignalHandler);
#endif
pool_->TrackThread(gpr_thd_currentid());
}
g_local_queue = new BasicWorkQueue(pool_.get());
pool_->theft_registry()->Enroll(g_local_queue);
ThreadLocal::SetIsEventEngineThread(true);
@ -410,6 +511,9 @@ void WorkStealingThreadPool::ThreadState::ThreadBody() {
GPR_ASSERT(g_local_queue->Empty());
pool_->theft_registry()->Unenroll(g_local_queue);
delete g_local_queue;
if (g_log_verbose_failures) {
pool_->UntrackThread(gpr_thd_currentid());
}
}
void WorkStealingThreadPool::ThreadState::SleepIfRunning() {

@ -31,6 +31,7 @@
#include "absl/functional/any_invocable.h"
#include <grpc/event_engine/event_engine.h>
#include <grpc/support/thd_id.h>
#include "src/core/lib/backoff/backoff.h"
#include "src/core/lib/event_engine/thread_pool/thread_count.h"
@ -132,6 +133,9 @@ class WorkStealingThreadPool final : public ThreadPool {
// Postfork parent and child have the same behavior.
void PrepareFork();
void Postfork();
// Thread ID tracking
void TrackThread(gpr_thd_id tid);
void UntrackThread(gpr_thd_id tid);
// Accessor methods
bool IsShutdown();
bool IsForking();
@ -172,6 +176,8 @@ class WorkStealingThreadPool final : public ThreadPool {
std::atomic<bool> lifeguard_running_{false};
};
void DumpStacksAndCrash();
const size_t reserve_threads_;
BusyThreadCount busy_thread_count_;
LivingThreadCount living_thread_count_;
@ -190,6 +196,9 @@ class WorkStealingThreadPool final : public ThreadPool {
std::atomic<bool> throttled_{false};
WorkSignal work_signal_;
Lifeguard lifeguard_;
// Set of threads for verbose failure debugging
grpc_core::Mutex thd_set_mu_;
absl::flat_hash_set<gpr_thd_id> thds_ ABSL_GUARDED_BY(thd_set_mu_);
};
class ThreadState {

@ -20,9 +20,10 @@
#include <grpc/support/port_platform.h>
#include <string>
#include <inttypes.h>
#include <grpc/support/time.h>
#include <csignal>
#include <string>
#ifdef GPR_POSIX_SYNC
@ -34,6 +35,7 @@
#include <grpc/support/log.h>
#include <grpc/support/sync.h>
#include <grpc/support/thd_id.h>
#include <grpc/support/time.h>
#include "src/core/lib/gpr/useful.h"
#include "src/core/lib/gprpp/crash.h"
@ -43,6 +45,7 @@
namespace grpc_core {
namespace {
class ThreadInternalsPosix;
struct thd_arg {
@ -192,6 +195,28 @@ class ThreadInternalsPosix : public internal::ThreadInternalsInterface {
} // namespace
void Thread::Signal(gpr_thd_id tid, int sig) {
auto kill_err = pthread_kill((pthread_t)tid, sig);
if (kill_err != 0) {
gpr_log(GPR_ERROR, "pthread_kill for tid %" PRIdPTR " failed: %s", tid,
StrError(kill_err).c_str());
}
}
#ifndef GPR_ANDROID
void Thread::Kill(gpr_thd_id tid) {
auto cancel_err = pthread_cancel((pthread_t)tid);
if (cancel_err != 0) {
gpr_log(GPR_ERROR, "pthread_cancel for tid %" PRIdPTR " failed: %s", tid,
StrError(cancel_err).c_str());
}
}
#else // GPR_ANDROID
void Thread::Kill(gpr_thd_id /* tid */) {
gpr_log(GPR_DEBUG, "Thread::Kill is not supported on Android.");
}
#endif
Thread::Thread(const char* thd_name, void (*thd_body)(void* arg), void* arg,
bool* success, const Options& options)
: options_(options) {

@ -31,6 +31,7 @@
#include "absl/functional/any_invocable.h"
#include <grpc/support/log.h>
#include <grpc/support/thd_id.h>
namespace grpc_core {
namespace internal {
@ -47,6 +48,13 @@ class ThreadInternalsInterface {
class Thread {
public:
// Send a signal to the thread.
// This is not supported on all platforms
static void Signal(gpr_thd_id tid, int sig);
// Kill the running thread. Likely not a clean operation.
// This is not supported on all platforms.
static void Kill(gpr_thd_id tid);
class Options {
public:
Options() : joinable_(true), tracked_(true), stack_size_(0) {}

@ -49,8 +49,9 @@
uint64_t now = grpc_core::Timestamp::FromTimespecRoundDown( \
gpr_now(GPR_CLOCK_MONOTONIC)) \
.milliseconds_after_process_epoch(); \
uint64_t prev_tsamp = prev.exchange(now); \
if (now - prev_tsamp > (n) * 1000) { \
if (prev == 0) prev = now; \
if (now - prev > (n) * 1000) { \
prev = now; \
gpr_log(severity, format, __VA_ARGS__); \
} \
} while (0)

@ -17,7 +17,6 @@
//
// Windows implementation for gpr threads.
#include <grpc/support/port_platform.h>
#ifdef GPR_WINDOWS
@ -146,6 +145,16 @@ class ThreadInternalsWindows
namespace grpc_core {
void Thread::Signal(gpr_thd_id /* tid */, int /* sig */) {
// TODO(hork): Implement
gpr_log(GPR_DEBUG, "Thread signals are not supported on Windows.");
}
void Thread::Kill(gpr_thd_id /* tid */) {
// TODO(hork): Implement
gpr_log(GPR_DEBUG, "Thread::Kill is not supported on Windows.");
}
Thread::Thread(const char* /* thd_name */, void (*thd_body)(void* arg),
void* arg, bool* success, const Options& options)
: options_(options) {

@ -16,8 +16,11 @@
#include <atomic>
#include <chrono>
#include <cmath>
#include <cstddef>
#include <functional>
#include <memory>
#include <thread>
#include <tuple>
#include <vector>
#include "absl/time/clock.h"
@ -31,6 +34,7 @@
#include "src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.h"
#include "src/core/lib/gprpp/notification.h"
#include "src/core/lib/gprpp/thd.h"
#include "src/core/lib/gprpp/time.h"
#include "test/core/util/test_config.h"
namespace grpc_event_engine {
@ -286,6 +290,15 @@ TYPED_TEST(ThreadPoolTest, WorkerThreadLocalRunWorksWithOtherPools) {
p1.Quiesce();
}
TYPED_TEST(ThreadPoolTest, DISABLED_TestDumpStack) {
TypeParam p1(8);
for (size_t i = 0; i < 8; i++) {
p1.Run([]() { absl::SleepFor(absl::Seconds(90)); });
}
absl::SleepFor(absl::Seconds(2));
p1.Quiesce();
}
class BusyThreadCountTest : public testing::Test {};
TEST_F(BusyThreadCountTest, StressTest) {
@ -452,11 +465,11 @@ TEST_F(LivingThreadCountTest, BlockUntilThreadCountTest) {
});
{
auto alive = living_thread_count.MakeAutoThreadCounter();
living_thread_count.BlockUntilThreadCount(1,
"block until 1 thread remains");
std::ignore = living_thread_count.BlockUntilThreadCount(
1, "block until 1 thread remains", grpc_core::Duration::Infinity());
}
living_thread_count.BlockUntilThreadCount(0,
"block until all threads are gone");
std::ignore = living_thread_count.BlockUntilThreadCount(
0, "block until all threads are gone", grpc_core::Duration::Infinity());
joiner.join();
ASSERT_EQ(living_thread_count.count(), 0);
}

Loading…
Cancel
Save