diff --git a/src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.cc b/src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.cc index efd3f86c60b..5b28f3c7b91 100644 --- a/src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.cc +++ b/src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.cc @@ -174,7 +174,7 @@ void WorkStealingThreadPool::PostforkChild() { pool_->Postfork(); } WorkStealingThreadPool::WorkStealingThreadPoolImpl::WorkStealingThreadPoolImpl( size_t reserve_threads) - : reserve_threads_(reserve_threads), lifeguard_(this) {} + : reserve_threads_(reserve_threads), queue_(this), lifeguard_(this) {} void WorkStealingThreadPool::WorkStealingThreadPoolImpl::Start() { for (size_t i = 0; i < reserve_threads_; i++) { @@ -186,7 +186,7 @@ void WorkStealingThreadPool::WorkStealingThreadPoolImpl::Start() { void WorkStealingThreadPool::WorkStealingThreadPoolImpl::Run( EventEngine::Closure* closure) { GPR_DEBUG_ASSERT(quiesced_.load(std::memory_order_relaxed) == false); - if (g_local_queue != nullptr) { + if (g_local_queue != nullptr && g_local_queue->owner() == this) { g_local_queue->Add(closure); } else { queue_.Add(closure); @@ -386,7 +386,7 @@ WorkStealingThreadPool::ThreadState::ThreadState( busy_count_idx_(pool_->busy_thread_count()->NextIndex()) {} void WorkStealingThreadPool::ThreadState::ThreadBody() { - g_local_queue = new BasicWorkQueue(); + g_local_queue = new BasicWorkQueue(pool_.get()); pool_->theft_registry()->Enroll(g_local_queue); ThreadLocal::SetIsEventEngineThread(true); while (Step()) { diff --git a/src/core/lib/event_engine/work_queue/basic_work_queue.cc b/src/core/lib/event_engine/work_queue/basic_work_queue.cc index 5843de6dee4..fd41b93fb24 100644 --- a/src/core/lib/event_engine/work_queue/basic_work_queue.cc +++ b/src/core/lib/event_engine/work_queue/basic_work_queue.cc @@ -23,6 +23,8 @@ namespace grpc_event_engine { namespace experimental { +BasicWorkQueue::BasicWorkQueue(void* owner) : owner_(owner) {} + bool BasicWorkQueue::Empty() const { grpc_core::MutexLock lock(&mu_); return q_.empty(); diff --git a/src/core/lib/event_engine/work_queue/basic_work_queue.h b/src/core/lib/event_engine/work_queue/basic_work_queue.h index 78ea0f27ade..f04d185333b 100644 --- a/src/core/lib/event_engine/work_queue/basic_work_queue.h +++ b/src/core/lib/event_engine/work_queue/basic_work_queue.h @@ -36,7 +36,8 @@ namespace experimental { // closures are added to the back. class BasicWorkQueue : public WorkQueue { public: - BasicWorkQueue() = default; + BasicWorkQueue() : owner_(nullptr) {} + explicit BasicWorkQueue(void* owner); // Returns whether the queue is empty bool Empty() const override ABSL_LOCKS_EXCLUDED(mu_); // Returns the size of the queue. @@ -59,10 +60,12 @@ class BasicWorkQueue : public WorkQueue { // Wraps an AnyInvocable and adds it to the the queue. void Add(absl::AnyInvocable invocable) override ABSL_LOCKS_EXCLUDED(mu_); + const void* owner() override { return owner_; } private: mutable grpc_core::Mutex mu_; std::deque q_ ABSL_GUARDED_BY(mu_); + const void* const owner_ = nullptr; }; } // namespace experimental diff --git a/src/core/lib/event_engine/work_queue/work_queue.h b/src/core/lib/event_engine/work_queue/work_queue.h index 262aff29162..270a88e3a09 100644 --- a/src/core/lib/event_engine/work_queue/work_queue.h +++ b/src/core/lib/event_engine/work_queue/work_queue.h @@ -54,6 +54,10 @@ class WorkQueue { virtual void Add(EventEngine::Closure* closure) = 0; // Wraps an AnyInvocable and adds it to the the queue. virtual void Add(absl::AnyInvocable invocable) = 0; + // Returns an optional owner id for queue identification. + // TODO(hork): revisit if this can be moved to the thread pool implementation + // if dynamic queue type experiments are warranted. + virtual const void* owner() = 0; }; } // namespace experimental diff --git a/test/core/event_engine/thread_pool_test.cc b/test/core/event_engine/thread_pool_test.cc index 55fe074d72a..59418f46b2a 100644 --- a/test/core/event_engine/thread_pool_test.cc +++ b/test/core/event_engine/thread_pool_test.cc @@ -11,7 +11,6 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. - #include "src/core/lib/event_engine/thread_pool/thread_pool.h" #include @@ -26,6 +25,7 @@ #include "gtest/gtest.h" #include +#include #include "src/core/lib/event_engine/thread_pool/thread_count.h" #include "src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.h" @@ -256,6 +256,36 @@ TYPED_TEST(ThreadPoolTest, QuiesceRaceStressTest) { } } +TYPED_TEST(ThreadPoolTest, WorkerThreadLocalRunWorksWithOtherPools) { + // WorkStealingThreadPools may queue work onto a thread-local queue, and that + // work may be stolen by other threads. This test tries to ensure that work + // queued from a pool-A worker-thread, to pool-B, does not end up on a pool-A + // queue. + constexpr size_t p1_run_iterations = 32; + constexpr size_t p2_run_iterations = 1000; + TypeParam p1(8); + TypeParam p2(8); + std::vector tid(p1_run_iterations); + std::atomic iter_count{0}; + grpc_core::Notification finished_all_iterations; + for (size_t p1_i = 0; p1_i < p1_run_iterations; p1_i++) { + p1.Run([&, p1_i, total_iterations = p1_run_iterations * p2_run_iterations] { + tid[p1_i] = gpr_thd_currentid(); + for (size_t p2_i = 0; p2_i < p2_run_iterations; p2_i++) { + p2.Run([&, p1_i, total_iterations] { + EXPECT_NE(tid[p1_i], gpr_thd_currentid()); + if (total_iterations == iter_count.fetch_add(1) + 1) { + finished_all_iterations.Notify(); + } + }); + } + }); + } + finished_all_iterations.WaitForNotification(); + p2.Quiesce(); + p1.Quiesce(); +} + class BusyThreadCountTest : public testing::Test {}; TEST_F(BusyThreadCountTest, StressTest) {