[EventEngine] Fix local work queueing logic with multiple pools (#34786)

This fixes a bug which could happen if multiple WorkStealingThreadPools
existed (wstpA and wstpB), and a thread in wstpA called
`wstpB->Run(closure)`. Previously, this would have scheduled the closure
on the current wstpA thread worker's local queue.

`bm_thread_pool` results look unchanged on RBE.
pull/34793/head
AJ Heller 1 year ago committed by GitHub
parent 485ff14990
commit 66d928c440
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 6
      src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.cc
  2. 2
      src/core/lib/event_engine/work_queue/basic_work_queue.cc
  3. 5
      src/core/lib/event_engine/work_queue/basic_work_queue.h
  4. 4
      src/core/lib/event_engine/work_queue/work_queue.h
  5. 32
      test/core/event_engine/thread_pool_test.cc

@ -174,7 +174,7 @@ void WorkStealingThreadPool::PostforkChild() { pool_->Postfork(); }
WorkStealingThreadPool::WorkStealingThreadPoolImpl::WorkStealingThreadPoolImpl( WorkStealingThreadPool::WorkStealingThreadPoolImpl::WorkStealingThreadPoolImpl(
size_t reserve_threads) size_t reserve_threads)
: reserve_threads_(reserve_threads), lifeguard_(this) {} : reserve_threads_(reserve_threads), queue_(this), lifeguard_(this) {}
void WorkStealingThreadPool::WorkStealingThreadPoolImpl::Start() { void WorkStealingThreadPool::WorkStealingThreadPoolImpl::Start() {
for (size_t i = 0; i < reserve_threads_; i++) { for (size_t i = 0; i < reserve_threads_; i++) {
@ -186,7 +186,7 @@ void WorkStealingThreadPool::WorkStealingThreadPoolImpl::Start() {
void WorkStealingThreadPool::WorkStealingThreadPoolImpl::Run( void WorkStealingThreadPool::WorkStealingThreadPoolImpl::Run(
EventEngine::Closure* closure) { EventEngine::Closure* closure) {
GPR_DEBUG_ASSERT(quiesced_.load(std::memory_order_relaxed) == false); GPR_DEBUG_ASSERT(quiesced_.load(std::memory_order_relaxed) == false);
if (g_local_queue != nullptr) { if (g_local_queue != nullptr && g_local_queue->owner() == this) {
g_local_queue->Add(closure); g_local_queue->Add(closure);
} else { } else {
queue_.Add(closure); queue_.Add(closure);
@ -386,7 +386,7 @@ WorkStealingThreadPool::ThreadState::ThreadState(
busy_count_idx_(pool_->busy_thread_count()->NextIndex()) {} busy_count_idx_(pool_->busy_thread_count()->NextIndex()) {}
void WorkStealingThreadPool::ThreadState::ThreadBody() { void WorkStealingThreadPool::ThreadState::ThreadBody() {
g_local_queue = new BasicWorkQueue(); g_local_queue = new BasicWorkQueue(pool_.get());
pool_->theft_registry()->Enroll(g_local_queue); pool_->theft_registry()->Enroll(g_local_queue);
ThreadLocal::SetIsEventEngineThread(true); ThreadLocal::SetIsEventEngineThread(true);
while (Step()) { while (Step()) {

@ -23,6 +23,8 @@
namespace grpc_event_engine { namespace grpc_event_engine {
namespace experimental { namespace experimental {
BasicWorkQueue::BasicWorkQueue(void* owner) : owner_(owner) {}
bool BasicWorkQueue::Empty() const { bool BasicWorkQueue::Empty() const {
grpc_core::MutexLock lock(&mu_); grpc_core::MutexLock lock(&mu_);
return q_.empty(); return q_.empty();

@ -36,7 +36,8 @@ namespace experimental {
// closures are added to the back. // closures are added to the back.
class BasicWorkQueue : public WorkQueue { class BasicWorkQueue : public WorkQueue {
public: public:
BasicWorkQueue() = default; BasicWorkQueue() : owner_(nullptr) {}
explicit BasicWorkQueue(void* owner);
// Returns whether the queue is empty // Returns whether the queue is empty
bool Empty() const override ABSL_LOCKS_EXCLUDED(mu_); bool Empty() const override ABSL_LOCKS_EXCLUDED(mu_);
// Returns the size of the queue. // Returns the size of the queue.
@ -59,10 +60,12 @@ class BasicWorkQueue : public WorkQueue {
// Wraps an AnyInvocable and adds it to the the queue. // Wraps an AnyInvocable and adds it to the the queue.
void Add(absl::AnyInvocable<void()> invocable) override void Add(absl::AnyInvocable<void()> invocable) override
ABSL_LOCKS_EXCLUDED(mu_); ABSL_LOCKS_EXCLUDED(mu_);
const void* owner() override { return owner_; }
private: private:
mutable grpc_core::Mutex mu_; mutable grpc_core::Mutex mu_;
std::deque<EventEngine::Closure*> q_ ABSL_GUARDED_BY(mu_); std::deque<EventEngine::Closure*> q_ ABSL_GUARDED_BY(mu_);
const void* const owner_ = nullptr;
}; };
} // namespace experimental } // namespace experimental

@ -54,6 +54,10 @@ class WorkQueue {
virtual void Add(EventEngine::Closure* closure) = 0; virtual void Add(EventEngine::Closure* closure) = 0;
// Wraps an AnyInvocable and adds it to the the queue. // Wraps an AnyInvocable and adds it to the the queue.
virtual void Add(absl::AnyInvocable<void()> invocable) = 0; virtual void Add(absl::AnyInvocable<void()> invocable) = 0;
// Returns an optional owner id for queue identification.
// TODO(hork): revisit if this can be moved to the thread pool implementation
// if dynamic queue type experiments are warranted.
virtual const void* owner() = 0;
}; };
} // namespace experimental } // namespace experimental

@ -11,7 +11,6 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
#include "src/core/lib/event_engine/thread_pool/thread_pool.h" #include "src/core/lib/event_engine/thread_pool/thread_pool.h"
#include <atomic> #include <atomic>
@ -26,6 +25,7 @@
#include "gtest/gtest.h" #include "gtest/gtest.h"
#include <grpc/grpc.h> #include <grpc/grpc.h>
#include <grpc/support/thd_id.h>
#include "src/core/lib/event_engine/thread_pool/thread_count.h" #include "src/core/lib/event_engine/thread_pool/thread_count.h"
#include "src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.h" #include "src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.h"
@ -256,6 +256,36 @@ TYPED_TEST(ThreadPoolTest, QuiesceRaceStressTest) {
} }
} }
TYPED_TEST(ThreadPoolTest, WorkerThreadLocalRunWorksWithOtherPools) {
// WorkStealingThreadPools may queue work onto a thread-local queue, and that
// work may be stolen by other threads. This test tries to ensure that work
// queued from a pool-A worker-thread, to pool-B, does not end up on a pool-A
// queue.
constexpr size_t p1_run_iterations = 32;
constexpr size_t p2_run_iterations = 1000;
TypeParam p1(8);
TypeParam p2(8);
std::vector<gpr_thd_id> tid(p1_run_iterations);
std::atomic<size_t> iter_count{0};
grpc_core::Notification finished_all_iterations;
for (size_t p1_i = 0; p1_i < p1_run_iterations; p1_i++) {
p1.Run([&, p1_i, total_iterations = p1_run_iterations * p2_run_iterations] {
tid[p1_i] = gpr_thd_currentid();
for (size_t p2_i = 0; p2_i < p2_run_iterations; p2_i++) {
p2.Run([&, p1_i, total_iterations] {
EXPECT_NE(tid[p1_i], gpr_thd_currentid());
if (total_iterations == iter_count.fetch_add(1) + 1) {
finished_all_iterations.Notify();
}
});
}
});
}
finished_all_iterations.WaitForNotification();
p2.Quiesce();
p1.Quiesce();
}
class BusyThreadCountTest : public testing::Test {}; class BusyThreadCountTest : public testing::Test {};
TEST_F(BusyThreadCountTest, StressTest) { TEST_F(BusyThreadCountTest, StressTest) {

Loading…
Cancel
Save