diff --git a/modules/gapi/src/executor/gstreamingexecutor.cpp b/modules/gapi/src/executor/gstreamingexecutor.cpp index d15e17ea28..a3a2746acc 100644 --- a/modules/gapi/src/executor/gstreamingexecutor.cpp +++ b/modules/gapi/src/executor/gstreamingexecutor.cpp @@ -323,16 +323,40 @@ public: void rewindToStop(std::vector &in_queues, const std::size_t this_id) { - for (auto &&qit : ade::util::indexed(in_queues)) - { - auto id2 = ade::util::index(qit); - auto &q2 = ade::util::value(qit); - if (this_id == id2) continue; + size_t expected_stop_count = std::count_if(in_queues.begin(), in_queues.end(), [] (const Q* ptr) { + return ptr != nullptr; + }); - Cmd cmd; - while (q2 && !cv::util::holds_alternative(cmd)) - q2->pop(cmd); + if (expected_stop_count > 0) { + // NB: it requires to substract own queues id from total waiting queue count + // because it had got stop message before rewind was called + expected_stop_count--; + } + GAPI_LOG_DEBUG(nullptr, "id: " << this_id << ", queues count: " << in_queues.size() << + ", expected stop msg count: " << expected_stop_count); + size_t got_stop_count = 0; + while(got_stop_count < expected_stop_count) { + for (auto &&qit : ade::util::indexed(in_queues)) { + auto id2 = ade::util::index(qit); + auto &q2 = ade::util::value(qit); + if (this_id == id2) continue; + + GAPI_LOG_DEBUG(nullptr, "drain next id: " << id2 << + ", stop count (" << got_stop_count << "/" << + expected_stop_count << ")"); + bool got_cmd = true; + while (q2 && got_cmd) { + Cmd cmd; + got_cmd = q2->try_pop(cmd); + if (got_cmd && cv::util::holds_alternative(cmd)) { + got_stop_count ++; + GAPI_LOG_DEBUG(nullptr, "got stop from id: " << id2); + break; + } + } + } } + GAPI_LOG_DEBUG(nullptr, "completed"); } // This method handles a stop sign got from some input