|
|
|
@ -323,16 +323,40 @@ public: |
|
|
|
|
void rewindToStop(std::vector<Q*> &in_queues, |
|
|
|
|
const std::size_t this_id) |
|
|
|
|
{ |
|
|
|
|
for (auto &&qit : ade::util::indexed(in_queues)) |
|
|
|
|
{ |
|
|
|
|
auto id2 = ade::util::index(qit); |
|
|
|
|
auto &q2 = ade::util::value(qit); |
|
|
|
|
if (this_id == id2) continue; |
|
|
|
|
size_t expected_stop_count = std::count_if(in_queues.begin(), in_queues.end(), [] (const Q* ptr) { |
|
|
|
|
return ptr != nullptr; |
|
|
|
|
}); |
|
|
|
|
|
|
|
|
|
Cmd cmd; |
|
|
|
|
while (q2 && !cv::util::holds_alternative<Stop>(cmd)) |
|
|
|
|
q2->pop(cmd); |
|
|
|
|
if (expected_stop_count > 0) { |
|
|
|
|
// NB: it requires to substract own queues id from total waiting queue count
|
|
|
|
|
// because it had got stop message before rewind was called
|
|
|
|
|
expected_stop_count--; |
|
|
|
|
} |
|
|
|
|
GAPI_LOG_DEBUG(nullptr, "id: " << this_id << ", queues count: " << in_queues.size() << |
|
|
|
|
", expected stop msg count: " << expected_stop_count); |
|
|
|
|
size_t got_stop_count = 0; |
|
|
|
|
while(got_stop_count < expected_stop_count) { |
|
|
|
|
for (auto &&qit : ade::util::indexed(in_queues)) { |
|
|
|
|
auto id2 = ade::util::index(qit); |
|
|
|
|
auto &q2 = ade::util::value(qit); |
|
|
|
|
if (this_id == id2) continue; |
|
|
|
|
|
|
|
|
|
GAPI_LOG_DEBUG(nullptr, "drain next id: " << id2 << |
|
|
|
|
", stop count (" << got_stop_count << "/" << |
|
|
|
|
expected_stop_count << ")"); |
|
|
|
|
bool got_cmd = true; |
|
|
|
|
while (q2 && got_cmd) { |
|
|
|
|
Cmd cmd; |
|
|
|
|
got_cmd = q2->try_pop(cmd); |
|
|
|
|
if (got_cmd && cv::util::holds_alternative<Stop>(cmd)) { |
|
|
|
|
got_stop_count ++; |
|
|
|
|
GAPI_LOG_DEBUG(nullptr, "got stop from id: " << id2); |
|
|
|
|
break; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
GAPI_LOG_DEBUG(nullptr, "completed"); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// This method handles a stop sign got from some input
|
|
|
|
|