|
|
|
@ -160,33 +160,6 @@ TYPED_TEST(ThreadPoolTest, CanStartLotsOfClosures) { |
|
|
|
|
ASSERT_EQ(runcount.load(), pow(2, branch_factor + 1) - 1); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
TYPED_TEST(ThreadPoolTest, ScalesWhenBackloggedFromSingleThreadLocalQueue) { |
|
|
|
|
int pool_thread_count = 8; |
|
|
|
|
TypeParam p(pool_thread_count); |
|
|
|
|
grpc_core::Notification signal; |
|
|
|
|
// Ensures the pool is saturated before signaling closures to continue.
|
|
|
|
|
std::atomic<int> waiters{0}; |
|
|
|
|
std::atomic<bool> signaled{false}; |
|
|
|
|
p.Run([&]() { |
|
|
|
|
for (int i = 0; i < pool_thread_count; i++) { |
|
|
|
|
p.Run([&]() { |
|
|
|
|
waiters.fetch_add(1); |
|
|
|
|
while (!signaled.load()) { |
|
|
|
|
signal.WaitForNotification(); |
|
|
|
|
} |
|
|
|
|
}); |
|
|
|
|
} |
|
|
|
|
while (waiters.load() != pool_thread_count) { |
|
|
|
|
absl::SleepFor(absl::Milliseconds(50)); |
|
|
|
|
} |
|
|
|
|
p.Run([&]() { |
|
|
|
|
signaled.store(true); |
|
|
|
|
signal.Notify(); |
|
|
|
|
}); |
|
|
|
|
}); |
|
|
|
|
p.Quiesce(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
class WorkStealingThreadPoolTest : public ::testing::Test {}; |
|
|
|
|
|
|
|
|
|
// TODO(hork): This is currently a pathological case for the original thread
|
|
|
|
@ -217,6 +190,37 @@ TEST_F(WorkStealingThreadPoolTest, ScalesWhenBackloggedFromGlobalQueue) { |
|
|
|
|
p.Quiesce(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// TODO(hork): This is currently a pathological case for the original thread
|
|
|
|
|
// pool, it gets wedged in ~3% of runs when new threads fail to start. When that
|
|
|
|
|
// is fixed, or the implementation is deleted, make this a typed test again.
|
|
|
|
|
TEST_F(WorkStealingThreadPoolTest, |
|
|
|
|
ScalesWhenBackloggedFromSingleThreadLocalQueue) { |
|
|
|
|
int pool_thread_count = 8; |
|
|
|
|
WorkStealingThreadPool p(pool_thread_count); |
|
|
|
|
grpc_core::Notification signal; |
|
|
|
|
// Ensures the pool is saturated before signaling closures to continue.
|
|
|
|
|
std::atomic<int> waiters{0}; |
|
|
|
|
std::atomic<bool> signaled{false}; |
|
|
|
|
p.Run([&]() { |
|
|
|
|
for (int i = 0; i < pool_thread_count; i++) { |
|
|
|
|
p.Run([&]() { |
|
|
|
|
waiters.fetch_add(1); |
|
|
|
|
while (!signaled.load()) { |
|
|
|
|
signal.WaitForNotification(); |
|
|
|
|
} |
|
|
|
|
}); |
|
|
|
|
} |
|
|
|
|
while (waiters.load() != pool_thread_count) { |
|
|
|
|
absl::SleepFor(absl::Milliseconds(50)); |
|
|
|
|
} |
|
|
|
|
p.Run([&]() { |
|
|
|
|
signaled.store(true); |
|
|
|
|
signal.Notify(); |
|
|
|
|
}); |
|
|
|
|
}); |
|
|
|
|
p.Quiesce(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
} // namespace experimental
|
|
|
|
|
} // namespace grpc_event_engine
|
|
|
|
|
|
|
|
|
|