diff --git a/modules/gapi/src/executor/gstreamingexecutor.cpp b/modules/gapi/src/executor/gstreamingexecutor.cpp index 41cb83f710..653d20e712 100644 --- a/modules/gapi/src/executor/gstreamingexecutor.cpp +++ b/modules/gapi/src/executor/gstreamingexecutor.cpp @@ -13,6 +13,10 @@ #include +#if !defined(GAPI_STANDALONE) +#include // GCopy -- FIXME - to be removed! +#endif // GAPI_STANDALONE + #include "api/gproto_priv.hpp" // ptr(GRunArgP) #include "compiler/passes/passes.hpp" #include "backends/common/gbackend.hpp" // createMat @@ -80,6 +84,10 @@ struct DataQueue { std::shared_ptr q; }; +struct DesyncSpecialCase { + static const char *name() { return "DesyncSpecialCase"; } +}; + std::vector reader_queues( ade::Graph &g, const ade::NodeHandle &obj) { @@ -936,19 +944,53 @@ cv::gimpl::GStreamingExecutor::GStreamingExecutor(std::unique_ptr && , isl_exec }); // Initialize queues for every operation's input - ade::TypedGraph qgr(*m_island_graph); + ade::TypedGraph qgr(*m_island_graph); + bool is_desync_start = false; for (auto eh : nh->inEdges()) { // ...only if the data is not compile-const if (const_ins.count(eh->srcNode()) == 0) { if (m_gim.metadata(eh).contains()) { qgr.metadata(eh).set(DataQueue(DataQueue::DESYNC)); + is_desync_start = true; + } else if (qgr.metadata(eh).contains()) { + // See comment below + // Limit queue size to 1 in this case + qgr.metadata(eh).set(DataQueue(1u)); } else { qgr.metadata(eh).set(DataQueue(queue_capacity)); } m_internal_queues.insert(qgr.metadata(eh).get().q.get()); } } + // WORKAROUND: + // Since now we always know desync() is followed by copy(), + // copy is always the island with DesyncIslEdge. + // Mark the node's outputs a special way so then its following + // queue sizes will be limited to 1 (to avoid copy reading more + // data in advance - as there's no other way for the underlying + // "slow" part to control it) + if (is_desync_start) { + auto isl = m_gim.metadata(nh).get().object; + // In the current implementation, such islands + // _must_ start with copy + GAPI_Assert(isl->in_ops().size() == 1u); +#if !defined(GAPI_STANDALONE) + GAPI_Assert(GModel::Graph(*m_orig_graph) + .metadata(*isl->in_ops().begin()) + .get() + .k.name == cv::gapi::core::GCopy::id()); +#endif // GAPI_STANDALONE + for (auto out_nh : nh->outNodes()) { + for (auto out_eh : out_nh->outEdges()) { + qgr.metadata(out_eh).set(DesyncSpecialCase{}); + } + } + } + // It is ok to do it here since the graph is visited in + // a topologic order and its consumers (those checking + // their input edges & initializing queues) are yet to be + // visited } break; case NodeKind::SLOT: