diff --git a/bazel/grpc_build_system.bzl b/bazel/grpc_build_system.bzl index f9cee08a186..f151740a234 100644 --- a/bazel/grpc_build_system.bzl +++ b/bazel/grpc_build_system.bzl @@ -299,6 +299,9 @@ def expand_tests(name, srcs, deps, tags, args, exclude_pollers, uses_polling, us """ poller_config = [] + # See work_stealing_thread_pool.cc for details. + default_env = {"GRPC_THREAD_POOL_VERBOSE_FAILURES": "true"} + if not uses_polling: tags = tags + ["no_uses_polling"] @@ -309,7 +312,7 @@ def expand_tests(name, srcs, deps, tags, args, exclude_pollers, uses_polling, us "tags": tags, "args": args, "flaky": flaky, - "env": {}, + "env": default_env, }) else: # On linux we run the same test with the default EventEngine, once for each @@ -329,7 +332,7 @@ def expand_tests(name, srcs, deps, tags, args, exclude_pollers, uses_polling, us "args": args, "env": { "GRPC_POLL_STRATEGY": poller, - }, + } | default_env, "flaky": flaky, }) @@ -346,7 +349,7 @@ def expand_tests(name, srcs, deps, tags, args, exclude_pollers, uses_polling, us "deps": deps, "tags": tags + ["no_linux"], "args": args, - "env": {}, + "env": default_env, "flaky": flaky, }) else: @@ -366,7 +369,7 @@ def expand_tests(name, srcs, deps, tags, args, exclude_pollers, uses_polling, us "deps": deps, "tags": test_tags, "args": test_args, - "env": {}, + "env": default_env, "flaky": flaky, }) diff --git a/src/core/BUILD b/src/core/BUILD index 4d88b2e78b6..ed0290c7bfb 100644 --- a/src/core/BUILD +++ b/src/core/BUILD @@ -1684,6 +1684,8 @@ grpc_cc_library( external_deps = [ "absl/base:core_headers", "absl/time", + "absl/status", + "absl/strings:str_format", ], deps = [ "time", @@ -1707,14 +1709,17 @@ grpc_cc_library( "absl/container:flat_hash_set", "absl/functional:any_invocable", "absl/time", + "absl/types:optional", ], deps = [ "common_event_engine_closures", + "env", "event_engine_basic_work_queue", "event_engine_thread_count", "event_engine_thread_local", "event_engine_trace", "event_engine_work_queue", + "examine_stack", "forkable", "no_destruct", "notification", @@ -1722,7 +1727,6 @@ grpc_cc_library( "//:backoff", "//:event_engine_base_hdrs", "//:gpr", - "//:grpc_trace", ], ) diff --git a/src/core/lib/event_engine/thread_pool/thread_count.cc b/src/core/lib/event_engine/thread_pool/thread_count.cc index 875e8c68cd8..5f8d2255aa4 100644 --- a/src/core/lib/event_engine/thread_pool/thread_count.cc +++ b/src/core/lib/event_engine/thread_pool/thread_count.cc @@ -17,27 +17,48 @@ #include +#include + +#include "absl/status/status.h" +#include "absl/strings/str_format.h" #include "absl/time/clock.h" #include "absl/time/time.h" #include +#include "src/core/lib/gprpp/time.h" + namespace grpc_event_engine { namespace experimental { // -------- LivingThreadCount -------- -void LivingThreadCount::BlockUntilThreadCount(size_t desired_threads, - const char* why) { - constexpr grpc_core::Duration log_rate = grpc_core::Duration::Seconds(3); +absl::Status LivingThreadCount::BlockUntilThreadCount( + size_t desired_threads, const char* why, + grpc_core::Duration stuck_timeout) { + grpc_core::Timestamp timeout_baseline = grpc_core::Timestamp::Now(); + constexpr grpc_core::Duration log_rate = grpc_core::Duration::Seconds(5); + size_t prev_thread_count = 0; while (true) { - auto curr_threads = WaitForCountChange(desired_threads, log_rate); - if (curr_threads == desired_threads) break; + auto curr_threads = WaitForCountChange(desired_threads, log_rate / 2); + if (curr_threads == desired_threads) return absl::OkStatus(); + auto elapsed = grpc_core::Timestamp::Now() - timeout_baseline; + if (curr_threads == prev_thread_count) { + if (elapsed > stuck_timeout) { + return absl::DeadlineExceededError(absl::StrFormat( + "Timed out after %f seconds", stuck_timeout.seconds())); + } + } else { + // the thread count has changed. Reset the timeout clock + prev_thread_count = curr_threads; + timeout_baseline = grpc_core::Timestamp::Now(); + } GRPC_LOG_EVERY_N_SEC_DELAYED( log_rate.seconds(), GPR_DEBUG, "Waiting for thread pool to idle before %s. (%" PRIdPTR " to %" PRIdPTR - ")", - why, curr_threads, desired_threads); + "). Timing out in %0.f seconds.", + why, curr_threads, desired_threads, + (stuck_timeout - elapsed).seconds()); } } diff --git a/src/core/lib/event_engine/thread_pool/thread_count.h b/src/core/lib/event_engine/thread_pool/thread_count.h index 52692dd58cb..8be6f79e944 100644 --- a/src/core/lib/event_engine/thread_pool/thread_count.h +++ b/src/core/lib/event_engine/thread_pool/thread_count.h @@ -151,7 +151,12 @@ class LivingThreadCount { --living_count_; cv_.SignalAll(); } - void BlockUntilThreadCount(size_t desired_threads, const char* why) + // Blocks the calling thread until the desired number of threads is reached. + // If the thread count does not change for some given `stuck_timeout` + // duration, this method returns error. If the thread count does change, the + // timeout clock is reset. + absl::Status BlockUntilThreadCount(size_t desired_threads, const char* why, + grpc_core::Duration stuck_timeout) ABSL_LOCKS_EXCLUDED(mu_); size_t count() ABSL_LOCKS_EXCLUDED(mu_) { grpc_core::MutexLock lock(&mu_); diff --git a/src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.cc b/src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.cc index b0356bbe24e..deb1e1c8165 100644 --- a/src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.cc +++ b/src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.cc @@ -15,7 +15,6 @@ // limitations under the License. // // - #include #include "src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.h" @@ -24,24 +23,36 @@ #include #include +#include #include #include +#include "absl/functional/any_invocable.h" #include "absl/time/clock.h" #include "absl/time/time.h" +#include "absl/types/optional.h" #include +#include #include "src/core/lib/backoff/backoff.h" -#include "src/core/lib/debug/trace.h" #include "src/core/lib/event_engine/common_closures.h" #include "src/core/lib/event_engine/thread_local.h" #include "src/core/lib/event_engine/trace.h" #include "src/core/lib/event_engine/work_queue/basic_work_queue.h" #include "src/core/lib/event_engine/work_queue/work_queue.h" +#include "src/core/lib/gprpp/crash.h" +#include "src/core/lib/gprpp/env.h" +#include "src/core/lib/gprpp/examine_stack.h" #include "src/core/lib/gprpp/thd.h" #include "src/core/lib/gprpp/time.h" +#ifdef GPR_POSIX_SYNC +#include +#elif defined(GPR_WINDOWS) +#include +#endif + // IWYU pragma: no_include // ## Thread Pool Fork-handling @@ -84,6 +95,13 @@ // non-existent Lifeguard thread to finish. Trying a simple // `lifeguard_thread_.Join()` leads to memory access errors. This implementation // uses Notifications to coordinate startup and shutdown states. +// +// ## Debugging +// +// Set the environment variable GRPC_THREAD_POOL_VERBOSE_FAILURES=anything to +// enable advanced debugging. When the pool takes too long to quiesce, a +// backtrace will be printed for every running thread, and the process will +// abort. namespace grpc_event_engine { namespace experimental { @@ -117,6 +135,37 @@ constexpr grpc_core::Duration kLifeguardMinSleepBetweenChecks{ // Maximum time the lifeguard thread should sleep between checking for new work. constexpr grpc_core::Duration kLifeguardMaxSleepBetweenChecks{ grpc_core::Duration::Seconds(1)}; +constexpr grpc_core::Duration kBlockUntilThreadCountTimeout{ + grpc_core::Duration::Seconds(60)}; + +#ifdef GPR_POSIX_SYNC +const bool g_log_verbose_failures = + grpc_core::GetEnv("GRPC_THREAD_POOL_VERBOSE_FAILURES").has_value(); +constexpr int kDumpStackSignal = SIGUSR1; +#elif defined(GPR_WINDOWS) +const bool g_log_verbose_failures = + grpc_core::GetEnv("GRPC_THREAD_POOL_VERBOSE_FAILURES").has_value(); +constexpr int kDumpStackSignal = SIGTERM; +#else +constexpr bool g_log_verbose_failures = false; +constexpr int kDumpStackSignal = -1; +#endif + +std::atomic g_reported_dump_count{0}; + +void DumpSignalHandler(int /* sig */) { + const auto trace = grpc_core::GetCurrentStackTrace(); + if (!trace.has_value()) { + gpr_log(GPR_ERROR, "DumpStack::%" PRIdPTR ": Stack trace not available", + gpr_thd_currentid()); + } else { + gpr_log(GPR_ERROR, "DumpStack::%" PRIdPTR ": %s", gpr_thd_currentid(), + trace->c_str()); + } + g_reported_dump_count.fetch_add(1); + grpc_core::Thread::Kill(gpr_thd_currentid()); +} + } // namespace thread_local WorkQueue* g_local_queue = nullptr; @@ -125,6 +174,10 @@ thread_local WorkQueue* g_local_queue = nullptr; WorkStealingThreadPool::WorkStealingThreadPool(size_t reserve_threads) : pool_{std::make_shared(reserve_threads)} { + if (g_log_verbose_failures) { + GRPC_EVENT_ENGINE_TRACE( + "%s", "WorkStealingThreadPool verbose failures are enabled"); + } pool_->Start(); } @@ -221,8 +274,13 @@ void WorkStealingThreadPool::WorkStealingThreadPoolImpl::Quiesce() { // running instead of zero. bool is_threadpool_thread = g_local_queue != nullptr; work_signal()->SignalAll(); - living_thread_count_.BlockUntilThreadCount(is_threadpool_thread ? 1 : 0, - "shutting down"); + auto threads_were_shut_down = living_thread_count_.BlockUntilThreadCount( + is_threadpool_thread ? 1 : 0, "shutting down", + g_log_verbose_failures ? kBlockUntilThreadCountTimeout + : grpc_core::Duration::Infinity()); + if (!threads_were_shut_down.ok() && g_log_verbose_failures) { + DumpStacksAndCrash(); + } GPR_ASSERT(queue_.Empty()); quiesced_.store(true, std::memory_order_relaxed); lifeguard_.BlockUntilShutdownAndReset(); @@ -262,7 +320,11 @@ void WorkStealingThreadPool::WorkStealingThreadPoolImpl::PrepareFork() { gpr_log(GPR_INFO, "WorkStealingThreadPoolImpl::PrepareFork"); SetForking(true); work_signal_.SignalAll(); - living_thread_count_.BlockUntilThreadCount(0, "forking"); + auto threads_were_shut_down = living_thread_count_.BlockUntilThreadCount( + 0, "forking", kBlockUntilThreadCountTimeout); + if (!threads_were_shut_down.ok() && g_log_verbose_failures) { + DumpStacksAndCrash(); + } lifeguard_.BlockUntilShutdownAndReset(); } @@ -271,6 +333,37 @@ void WorkStealingThreadPool::WorkStealingThreadPoolImpl::Postfork() { Start(); } +void WorkStealingThreadPool::WorkStealingThreadPoolImpl::TrackThread( + gpr_thd_id tid) { + grpc_core::MutexLock lock(&thd_set_mu_); + thds_.insert(tid); +} + +void WorkStealingThreadPool::WorkStealingThreadPoolImpl::UntrackThread( + gpr_thd_id tid) { + grpc_core::MutexLock lock(&thd_set_mu_); + thds_.erase(tid); +} + +void WorkStealingThreadPool::WorkStealingThreadPoolImpl::DumpStacksAndCrash() { + grpc_core::MutexLock lock(&thd_set_mu_); + gpr_log(GPR_ERROR, + "Pool did not quiesce in time, gRPC will not shut down cleanly. " + "Dumping all %zu thread stacks.", + thds_.size()); + for (const auto tid : thds_) { + grpc_core::Thread::Signal(tid, kDumpStackSignal); + } + // If this is a thread pool thread, wait for one fewer thread. + auto ignore_thread_count = g_local_queue != nullptr ? 1 : 0; + while (living_thread_count_.count() - ignore_thread_count > + g_reported_dump_count.load()) { + absl::SleepFor(absl::Milliseconds(200)); + } + grpc_core::Crash( + "Pool did not quiesce in time, gRPC will not shut down cleanly."); +} + // -------- WorkStealingThreadPool::WorkStealingThreadPoolImpl::Lifeguard ----- WorkStealingThreadPool::WorkStealingThreadPoolImpl::Lifeguard::Lifeguard( @@ -388,6 +481,14 @@ WorkStealingThreadPool::ThreadState::ThreadState( busy_count_idx_(pool_->busy_thread_count()->NextIndex()) {} void WorkStealingThreadPool::ThreadState::ThreadBody() { + if (g_log_verbose_failures) { +#ifdef GPR_POSIX_SYNC + std::signal(kDumpStackSignal, DumpSignalHandler); +#elif defined(GPR_WINDOWS) + signal(kDumpStackSignal, DumpSignalHandler); +#endif + pool_->TrackThread(gpr_thd_currentid()); + } g_local_queue = new BasicWorkQueue(pool_.get()); pool_->theft_registry()->Enroll(g_local_queue); ThreadLocal::SetIsEventEngineThread(true); @@ -410,6 +511,9 @@ void WorkStealingThreadPool::ThreadState::ThreadBody() { GPR_ASSERT(g_local_queue->Empty()); pool_->theft_registry()->Unenroll(g_local_queue); delete g_local_queue; + if (g_log_verbose_failures) { + pool_->UntrackThread(gpr_thd_currentid()); + } } void WorkStealingThreadPool::ThreadState::SleepIfRunning() { diff --git a/src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.h b/src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.h index 2fc646b84eb..dfb3b578d82 100644 --- a/src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.h +++ b/src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.h @@ -31,6 +31,7 @@ #include "absl/functional/any_invocable.h" #include +#include #include "src/core/lib/backoff/backoff.h" #include "src/core/lib/event_engine/thread_pool/thread_count.h" @@ -132,6 +133,9 @@ class WorkStealingThreadPool final : public ThreadPool { // Postfork parent and child have the same behavior. void PrepareFork(); void Postfork(); + // Thread ID tracking + void TrackThread(gpr_thd_id tid); + void UntrackThread(gpr_thd_id tid); // Accessor methods bool IsShutdown(); bool IsForking(); @@ -172,6 +176,8 @@ class WorkStealingThreadPool final : public ThreadPool { std::atomic lifeguard_running_{false}; }; + void DumpStacksAndCrash(); + const size_t reserve_threads_; BusyThreadCount busy_thread_count_; LivingThreadCount living_thread_count_; @@ -190,6 +196,9 @@ class WorkStealingThreadPool final : public ThreadPool { std::atomic throttled_{false}; WorkSignal work_signal_; Lifeguard lifeguard_; + // Set of threads for verbose failure debugging + grpc_core::Mutex thd_set_mu_; + absl::flat_hash_set thds_ ABSL_GUARDED_BY(thd_set_mu_); }; class ThreadState { diff --git a/src/core/lib/gprpp/posix/thd.cc b/src/core/lib/gprpp/posix/thd.cc index e33198a9ad3..4df83de4050 100644 --- a/src/core/lib/gprpp/posix/thd.cc +++ b/src/core/lib/gprpp/posix/thd.cc @@ -20,9 +20,10 @@ #include -#include +#include -#include +#include +#include #ifdef GPR_POSIX_SYNC @@ -34,6 +35,7 @@ #include #include #include +#include #include "src/core/lib/gpr/useful.h" #include "src/core/lib/gprpp/crash.h" @@ -43,6 +45,7 @@ namespace grpc_core { namespace { + class ThreadInternalsPosix; struct thd_arg { @@ -192,6 +195,28 @@ class ThreadInternalsPosix : public internal::ThreadInternalsInterface { } // namespace +void Thread::Signal(gpr_thd_id tid, int sig) { + auto kill_err = pthread_kill((pthread_t)tid, sig); + if (kill_err != 0) { + gpr_log(GPR_ERROR, "pthread_kill for tid %" PRIdPTR " failed: %s", tid, + StrError(kill_err).c_str()); + } +} + +#ifndef GPR_ANDROID +void Thread::Kill(gpr_thd_id tid) { + auto cancel_err = pthread_cancel((pthread_t)tid); + if (cancel_err != 0) { + gpr_log(GPR_ERROR, "pthread_cancel for tid %" PRIdPTR " failed: %s", tid, + StrError(cancel_err).c_str()); + } +} +#else // GPR_ANDROID +void Thread::Kill(gpr_thd_id /* tid */) { + gpr_log(GPR_DEBUG, "Thread::Kill is not supported on Android."); +} +#endif + Thread::Thread(const char* thd_name, void (*thd_body)(void* arg), void* arg, bool* success, const Options& options) : options_(options) { diff --git a/src/core/lib/gprpp/thd.h b/src/core/lib/gprpp/thd.h index a2d9101bce3..c0f218d75aa 100644 --- a/src/core/lib/gprpp/thd.h +++ b/src/core/lib/gprpp/thd.h @@ -31,6 +31,7 @@ #include "absl/functional/any_invocable.h" #include +#include namespace grpc_core { namespace internal { @@ -47,6 +48,13 @@ class ThreadInternalsInterface { class Thread { public: + // Send a signal to the thread. + // This is not supported on all platforms + static void Signal(gpr_thd_id tid, int sig); + // Kill the running thread. Likely not a clean operation. + // This is not supported on all platforms. + static void Kill(gpr_thd_id tid); + class Options { public: Options() : joinable_(true), tracked_(true), stack_size_(0) {} diff --git a/src/core/lib/gprpp/time.h b/src/core/lib/gprpp/time.h index ac52a982af0..65aa968bc8e 100644 --- a/src/core/lib/gprpp/time.h +++ b/src/core/lib/gprpp/time.h @@ -49,8 +49,9 @@ uint64_t now = grpc_core::Timestamp::FromTimespecRoundDown( \ gpr_now(GPR_CLOCK_MONOTONIC)) \ .milliseconds_after_process_epoch(); \ - uint64_t prev_tsamp = prev.exchange(now); \ - if (now - prev_tsamp > (n) * 1000) { \ + if (prev == 0) prev = now; \ + if (now - prev > (n) * 1000) { \ + prev = now; \ gpr_log(severity, format, __VA_ARGS__); \ } \ } while (0) diff --git a/src/core/lib/gprpp/windows/thd.cc b/src/core/lib/gprpp/windows/thd.cc index 58efa7e1832..ffabe3545fe 100644 --- a/src/core/lib/gprpp/windows/thd.cc +++ b/src/core/lib/gprpp/windows/thd.cc @@ -17,7 +17,6 @@ // // Windows implementation for gpr threads. - #include #ifdef GPR_WINDOWS @@ -146,6 +145,16 @@ class ThreadInternalsWindows namespace grpc_core { +void Thread::Signal(gpr_thd_id /* tid */, int /* sig */) { + // TODO(hork): Implement + gpr_log(GPR_DEBUG, "Thread signals are not supported on Windows."); +} + +void Thread::Kill(gpr_thd_id /* tid */) { + // TODO(hork): Implement + gpr_log(GPR_DEBUG, "Thread::Kill is not supported on Windows."); +} + Thread::Thread(const char* /* thd_name */, void (*thd_body)(void* arg), void* arg, bool* success, const Options& options) : options_(options) { diff --git a/test/core/event_engine/thread_pool_test.cc b/test/core/event_engine/thread_pool_test.cc index 59418f46b2a..b7248e81219 100644 --- a/test/core/event_engine/thread_pool_test.cc +++ b/test/core/event_engine/thread_pool_test.cc @@ -16,8 +16,11 @@ #include #include #include +#include #include +#include #include +#include #include #include "absl/time/clock.h" @@ -31,6 +34,7 @@ #include "src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.h" #include "src/core/lib/gprpp/notification.h" #include "src/core/lib/gprpp/thd.h" +#include "src/core/lib/gprpp/time.h" #include "test/core/util/test_config.h" namespace grpc_event_engine { @@ -286,6 +290,15 @@ TYPED_TEST(ThreadPoolTest, WorkerThreadLocalRunWorksWithOtherPools) { p1.Quiesce(); } +TYPED_TEST(ThreadPoolTest, DISABLED_TestDumpStack) { + TypeParam p1(8); + for (size_t i = 0; i < 8; i++) { + p1.Run([]() { absl::SleepFor(absl::Seconds(90)); }); + } + absl::SleepFor(absl::Seconds(2)); + p1.Quiesce(); +} + class BusyThreadCountTest : public testing::Test {}; TEST_F(BusyThreadCountTest, StressTest) { @@ -452,11 +465,11 @@ TEST_F(LivingThreadCountTest, BlockUntilThreadCountTest) { }); { auto alive = living_thread_count.MakeAutoThreadCounter(); - living_thread_count.BlockUntilThreadCount(1, - "block until 1 thread remains"); + std::ignore = living_thread_count.BlockUntilThreadCount( + 1, "block until 1 thread remains", grpc_core::Duration::Infinity()); } - living_thread_count.BlockUntilThreadCount(0, - "block until all threads are gone"); + std::ignore = living_thread_count.BlockUntilThreadCount( + 0, "block until all threads are gone", grpc_core::Duration::Infinity()); joiner.join(); ASSERT_EQ(living_thread_count.count(), 0); }