// This file is part of OpenCV project. // It is subject to the license terms in the LICENSE file found in the top-level directory // of this distribution and at http://opencv.org/license.html. // // Copyright (C) 2019 Intel Corporation #include "../test_precomp.hpp" #include #include #include "executor/conc_queue.hpp" namespace opencv_test { using namespace cv::gapi; TEST(ConcQueue, PushPop) { own::concurrent_bounded_queue q; for (int i = 0; i < 100; i++) { q.push(i); } for (int i = 0; i < 100; i++) { int x; q.pop(x); EXPECT_EQ(x, i); } } TEST(ConcQueue, TryPop) { own::concurrent_bounded_queue q; int x = 0; EXPECT_FALSE(q.try_pop(x)); q.push(1); EXPECT_TRUE(q.try_pop(x)); EXPECT_EQ(1, x); } TEST(ConcQueue, Clear) { own::concurrent_bounded_queue q; for (int i = 0; i < 10; i++) { q.push(i); } q.clear(); int x = 0; EXPECT_FALSE(q.try_pop(x)); } // In this test, every writer thread produce its own range of integer // numbers, writing those to a shared queue. // // Every reader thread pops elements from the queue (until -1 is // reached) and stores those in its own associated set. // // Finally, the master thread waits for completion of all other // threads and verifies that all the necessary data is // produced/obtained. using StressParam = std::tuple; // Queue capacity namespace { constexpr int STOP_SIGN = -1; constexpr int BASE = 1000; } struct ConcQueue_: public ::testing::TestWithParam { using Q = own::concurrent_bounded_queue; using S = std::unordered_set; static void writer(int base, int writes, Q& q) { for (int i = 0; i < writes; i++) { q.push(base + i); } q.push(STOP_SIGN); } static void reader(Q& q, S& s) { int x = 0; while (true) { q.pop(x); if (x == STOP_SIGN) return; s.insert(x); } } }; TEST_P(ConcQueue_, Test) { int num_writers = 0; int num_writes = 0; int num_readers = 0; std::size_t capacity = 0u; std::tie(num_writers, num_writes, num_readers, capacity) = GetParam(); CV_Assert(num_writers < 20); CV_Assert(num_writes < BASE); Q q; if (capacity) { // see below (2) CV_Assert(static_cast(capacity) > (num_writers - num_readers)); q.set_capacity(capacity); } // Start reader threads std::vector storage(num_readers); std::vector readers; for (S& s : storage) { readers.emplace_back(reader, std::ref(q), std::ref(s)); } // Start writer threads, also pre-generate reference numbers S reference; std::vector writers; for (int w = 0; w < num_writers; w++) { writers.emplace_back(writer, w*BASE, num_writes, std::ref(q)); for (int r = 0; r < num_writes; r++) { reference.insert(w*BASE + r); } } // Every writer puts a STOP_SIGN at the end, // There are three cases: // 1) num_writers == num_readers // every reader should get its own STOP_SIGN from any // of the writers // // 2) num_writers > num_readers // every reader will get a STOP_SIGN but there're more // STOP_SIGNs may be pushed to the queue - and if this // number exceeds capacity, writers block (to a deadlock). // The latter situation must be avoided at parameters level. // [a] Also not every data produced by writers will be consumed // by a reader in this case. Master thread will read the rest // // 3) num_readers > num_writers // in this case, some readers will stuck and will never get // a STOP_SIGN. Master thread will push extra STOP_SIGNs to the // queue. // Solution to (2a) S remnants; if (num_writers > num_readers) { int extra = num_writers - num_readers; while (extra) { int x = 0; q.pop(x); if (x == STOP_SIGN) extra--; else remnants.insert(x); } } // Solution to (3) if (num_readers > num_writers) { int extra = num_readers - num_writers; while (extra--) q.push(STOP_SIGN); } // Wait for completions for (auto &t : readers) t.join(); for (auto &t : writers) t.join(); // Accumulate and validate the result S result(remnants.begin(), remnants.end()); for (const auto &s : storage) result.insert(s.begin(), s.end()); EXPECT_EQ(reference, result); } INSTANTIATE_TEST_CASE_P(ConcQueueStress, ConcQueue_, Combine( Values(1, 2, 4, 8, 16) // writers , Values(1, 32, 96, 256) // writes , Values(1, 2, 10) // readers , Values(0u, 16u, 32u))); // capacity } // namespace opencv_test