Merge pull request #17000 from rgarnov:async_islands

pull/17033/head
Alexander Alekhin 5 years ago
commit 0556450801
  1. 7
      modules/gapi/include/opencv2/gapi/garray.hpp
  2. 7
      modules/gapi/include/opencv2/gapi/gopaque.hpp
  3. 23
      modules/gapi/src/api/gproto.cpp
  4. 6
      modules/gapi/src/api/gproto_priv.hpp
  5. 31
      modules/gapi/src/compiler/gislandmodel.cpp
  6. 25
      modules/gapi/src/compiler/gislandmodel.hpp
  7. 7
      modules/gapi/src/executor/gexecutor.cpp
  8. 337
      modules/gapi/src/executor/gstreamingexecutor.cpp
  9. 34
      modules/gapi/test/internal/gapi_int_proto_tests.cpp
  10. 4
      modules/gapi/test/streaming/gapi_streaming_tests.cpp

@ -29,7 +29,6 @@ namespace cv
// (user-inaccessible) classes.
class GNode;
struct GOrigin;
template<typename T> class GArray;
/**
@ -116,6 +115,7 @@ namespace detail
virtual ~BasicVectorRef() {}
virtual void mov(BasicVectorRef &ref) = 0;
virtual const void* ptr() const = 0;
};
template<typename T> class VectorRefT final: public BasicVectorRef
@ -208,6 +208,8 @@ namespace detail
GAPI_Assert(tv != nullptr);
wref() = std::move(tv->wref());
}
virtual const void* ptr() const override { return &rref(); }
};
// This class strips type information from VectorRefT<> and makes it usable
@ -262,6 +264,9 @@ namespace detail
{
return m_ref->m_desc;
}
// May be used to uniquely identify this object internally
const void *ptr() const { return m_ref->ptr(); }
};
// Helper (FIXME: work-around?)

@ -25,7 +25,6 @@ namespace cv
// (user-inaccessible) classes.
class GNode;
struct GOrigin;
template<typename T> class GOpaque;
/**
@ -107,6 +106,7 @@ namespace detail
virtual ~BasicOpaqueRef() {}
virtual void mov(BasicOpaqueRef &ref) = 0;
virtual const void* ptr() const = 0;
};
template<typename T> class OpaqueRefT final: public BasicOpaqueRef
@ -198,6 +198,8 @@ namespace detail
GAPI_Assert(tv != nullptr);
wref() = std::move(tv->wref());
}
virtual const void* ptr() const override { return &rref(); }
};
// This class strips type information from OpaqueRefT<> and makes it usable
@ -250,6 +252,9 @@ namespace detail
{
return m_ref->m_desc;
}
// May be used to uniquely identify this object internally
const void *ptr() const { return m_ref->ptr(); }
};
} // namespace detail

@ -264,4 +264,27 @@ std::ostream& operator<<(std::ostream& os, const cv::GMetaArg &arg)
return os;
}
} // namespace cv
const void* cv::gimpl::proto::ptr(const GRunArgP &arg)
{
switch (arg.index())
{
#if !defined(GAPI_STANDALONE)
case GRunArgP::index_of<cv::Mat*>():
return static_cast<const void*>(cv::util::get<cv::Mat*>(arg));
case GRunArgP::index_of<cv::Scalar*>():
return static_cast<const void*>(cv::util::get<cv::Scalar*>(arg));
case GRunArgP::index_of<cv::UMat*>():
return static_cast<const void*>(cv::util::get<cv::UMat*>(arg));
#endif
case GRunArgP::index_of<cv::gapi::own::Mat*>():
return static_cast<const void*>(cv::util::get<cv::gapi::own::Mat*>(arg));
case GRunArgP::index_of<cv::detail::VectorRef>():
return cv::util::get<cv::detail::VectorRef>(arg).ptr();
case GRunArgP::index_of<cv::detail::OpaqueRef>():
return cv::util::get<cv::detail::OpaqueRef>(arg).ptr();
default:
util::throw_error(std::logic_error("Unknown GRunArgP type!"));
}
}

@ -28,8 +28,14 @@ GAPI_EXPORTS const GOrigin& origin_of (const GArg &arg);
bool is_dynamic(const GArg &arg);
GProtoArg rewrap (const GArg &arg);
// FIXME:: GAPI_EXPORTS because of tests only!!
GAPI_EXPORTS const void* ptr (const GRunArgP &arg);
} // proto
} // gimpl
} // cv
// FIXME: the gproto.cpp file has more functions that listed here
// where those are declared??
#endif // OPENCV_GAPI_GPROTO_PRIV_HPP

@ -327,13 +327,40 @@ void GIslandExecutable::run(GIslandExecutable::IInput &in, GIslandExecutable::IO
std::vector<OutObj> out_objs;
const auto &in_desc = in.desc();
const auto &out_desc = out.desc();
const auto in_vector = in.get(); // FIXME: passing temporary objects to toRange() leads to issues
const auto in_msg = in.get();
if (cv::util::holds_alternative<cv::gimpl::EndOfStream>(in_msg))
{
out.post(cv::gimpl::EndOfStream{});
return;
}
GAPI_Assert(cv::util::holds_alternative<cv::GRunArgs>(in_msg));
const auto in_vector = cv::util::get<cv::GRunArgs>(in_msg);
in_objs.reserve(in_desc.size());
out_objs.reserve(out_desc.size());
for (auto &&it: ade::util::zip(ade::util::toRange(in_desc),
ade::util::toRange(in_vector)))
{
in_objs.emplace_back(std::get<0>(it), std::get<1>(it));
// FIXME: Not every Island expects a cv::Mat instead of own::Mat on input
// This kludge should go as a result of de-ownification
const cv::GRunArg& in_data_orig = std::get<1>(it);
cv::GRunArg in_data;
#if !defined(GAPI_STANDALONE)
switch (in_data_orig.index())
{
case cv::GRunArg::index_of<cv::Mat>():
in_data = cv::GRunArg{cv::to_own(cv::util::get<cv::Mat>(in_data_orig))};
break;
case cv::GRunArg::index_of<cv::Scalar>():
in_data = cv::GRunArg{(cv::util::get<cv::Scalar>(in_data_orig))};
break;
default:
in_data = in_data_orig;
break;
}
#else
in_data = in_data_orig;
#endif // GAPI_STANDALONE
in_objs.emplace_back(std::get<0>(it), std::move(in_data));
}
for (auto &&it: ade::util::indexed(ade::util::toRange(out_desc)))
{

@ -123,6 +123,24 @@ public:
virtual bool canReshape() const = 0;
virtual void reshape(ade::Graph& g, const GCompileArgs& args) = 0;
// This method is called when the GStreamingCompiled gets a new
// input source to process. Normally this method is called once
// per stream execution.
//
// The idea of this method is to reset backend's stream-associated
// internal state, if there is any.
//
// The regular GCompiled invocation doesn't call this, there may
// be reset() introduced there but it is completely unnecessary at
// this moment.
//
// FIXME: The design on this and so-called "stateful" kernels is not
// closed yet.
// FIXME: This thing will likely break stuff once we introduce
// "multi-source streaming", a better design needs to be proposed
// at that stage.
virtual void handleNewStream() {}; // do nothing here by default
virtual ~GIslandExecutable() = default;
};
@ -133,15 +151,18 @@ public:
void set(const std::vector<cv::gimpl::RcDesc> &newd) { d = newd; }
const std::vector<cv::gimpl::RcDesc> &desc() const { return d; }
};
struct EndOfStream {};
using StreamMsg = cv::util::variant<EndOfStream, cv::GRunArgs>;
struct GIslandExecutable::IInput: public GIslandExecutable::IODesc {
virtual ~IInput() = default;
virtual cv::GRunArgs get() = 0; // Get a new input vector (blocking)
virtual cv::GRunArgs try_get() = 0; // Get a new input vector (non-blocking)
virtual StreamMsg get() = 0; // Get a new input vector (blocking)
virtual StreamMsg try_get() = 0; // Get a new input vector (non-blocking)
};
struct GIslandExecutable::IOutput: public GIslandExecutable::IODesc {
virtual ~IOutput() = default;
virtual GRunArgP get(int idx) = 0; // Allocate (wrap) a new data object for output idx
virtual void post(GRunArgP&&) = 0; // Release the object back to the framework (mark available)
virtual void post(EndOfStream&&) = 0; // Post end-of-stream marker back to the framework
};
// GIslandEmitter - a backend-specific thing which feeds data into

@ -126,13 +126,13 @@ void cv::gimpl::GExecutor::initResource(const ade::NodeHandle &orig_nh)
class cv::gimpl::GExecutor::Input final: public cv::gimpl::GIslandExecutable::IInput
{
cv::gimpl::Mag &mag;
virtual cv::GRunArgs get() override
virtual StreamMsg get() override
{
cv::GRunArgs res;
for (const auto &rc : desc()) { res.emplace_back(magazine::getArg(mag, rc)); }
return res;
return StreamMsg{std::move(res)};
}
virtual cv::GRunArgs try_get() override { return get(); }
virtual StreamMsg try_get() override { return get(); }
public:
Input(cv::gimpl::Mag &m, const std::vector<RcDesc> &rcs) : mag(m) { set(rcs); }
};
@ -142,6 +142,7 @@ class cv::gimpl::GExecutor::Output final: public cv::gimpl::GIslandExecutable::I
cv::gimpl::Mag &mag;
virtual GRunArgP get(int idx) override { return magazine::getObjPtr(mag, desc()[idx]); }
virtual void post(GRunArgP&&) override { } // Do nothing here
virtual void post(EndOfStream&&) override {} // Do nothing here too
public:
Output(cv::gimpl::Mag &m, const std::vector<RcDesc> &rcs) : mag(m) { set(rcs); }
};

@ -12,11 +12,13 @@
#include <opencv2/gapi/opencv_includes.hpp>
#include "executor/gstreamingexecutor.hpp"
#include "api/gproto_priv.hpp" // ptr(GRunArgP)
#include "compiler/passes/passes.hpp"
#include "backends/common/gbackend.hpp" // createMat
#include "compiler/gcompiler.hpp" // for compileIslands
#include "executor/gstreamingexecutor.hpp"
namespace
{
using namespace cv::gimpl::stream;
@ -362,6 +364,208 @@ void emitterActorThread(std::shared_ptr<cv::gimpl::GIslandEmitter> emitter,
}
}
class StreamingInput final: public cv::gimpl::GIslandExecutable::IInput
{
QueueReader &qr;
std::vector<Q*> &in_queues; // FIXME: This can be part of QueueReader
cv::GRunArgs &in_constants; // FIXME: This can be part of QueueReader
virtual cv::gimpl::StreamMsg get() override
{
cv::GRunArgs isl_input_args;
if (!qr.getInputVector(in_queues, in_constants, isl_input_args))
{
// Stop case
return cv::gimpl::StreamMsg{cv::gimpl::EndOfStream{}};
}
return cv::gimpl::StreamMsg{std::move(isl_input_args)};
}
virtual cv::gimpl::StreamMsg try_get() override
{
// FIXME: This is not very usable at the moment!
return get();
}
public:
explicit StreamingInput(QueueReader &rdr,
std::vector<Q*> &inq,
cv::GRunArgs &inc,
const std::vector<cv::gimpl::RcDesc> &in_descs)
: qr(rdr), in_queues(inq), in_constants(inc)
{
set(in_descs);
}
};
class StreamingOutput final: public cv::gimpl::GIslandExecutable::IOutput
{
// These objects form an internal state of the StreamingOutput
struct Posting
{
using V = cv::util::variant<cv::GRunArg, cv::gimpl::EndOfStream>;
V data;
bool ready = false;
};
using PostingList = std::list<Posting>;
std::vector<PostingList> m_postings;
std::unordered_map< const void*
, std::pair<int, PostingList::iterator>
> m_postIdx;
std::size_t m_stops_sent = 0u;
// These objects are owned externally
const cv::GMetaArgs &m_metas;
std::vector< std::vector<Q*> > &m_out_queues;
// Allocate a new data object for output under idx
// Prepare this object for posting
virtual cv::GRunArgP get(int idx) override
{
#if !defined(GAPI_STANDALONE)
using MatType = cv::Mat;
using SclType = cv::Scalar;
#else
using MatType = cv::gapi::own::Mat;
using SclType = cv::gapi::own::Scalar;
#endif // GAPI_STANDALONE
// Allocate a new posting first, then bind this GRunArgP to this item
auto iter = m_postings[idx].insert(m_postings[idx].end(), Posting{});
const auto r = desc()[idx];
cv::GRunArg& out_arg = cv::util::get<cv::GRunArg>(iter->data);
cv::GRunArgP ret_val;
switch (r.shape) {
// Allocate a data object based on its shape & meta, and put it into our vectors.
// Yes, first we put a cv::Mat GRunArg, and then specify _THAT_
// pointer as an output parameter - to make sure that after island completes,
// our GRunArg still has the right (up-to-date) value.
// Same applies to other types.
// FIXME: This is absolutely ugly but seem to work perfectly for its purpose.
case cv::GShape::GMAT:
{
MatType newMat;
cv::gimpl::createMat(cv::util::get<cv::GMatDesc>(m_metas[idx]), newMat);
out_arg = cv::GRunArg(std::move(newMat));
ret_val = cv::GRunArgP(&cv::util::get<MatType>(out_arg));
}
break;
case cv::GShape::GSCALAR:
{
SclType newScl;
out_arg = cv::GRunArg(std::move(newScl));
ret_val = cv::GRunArgP(&cv::util::get<SclType>(out_arg));
}
break;
case cv::GShape::GARRAY:
{
cv::detail::VectorRef newVec;
cv::util::get<cv::detail::ConstructVec>(r.ctor)(newVec);
out_arg = cv::GRunArg(std::move(newVec));
// VectorRef is implicitly shared so no pointer is taken here
// FIXME: that variant MOVE problem again
const auto &rr = cv::util::get<cv::detail::VectorRef>(out_arg);
ret_val = cv::GRunArgP(rr);
}
break;
case cv::GShape::GOPAQUE:
{
cv::detail::OpaqueRef newOpaque;
cv::util::get<cv::detail::ConstructOpaque>(r.ctor)(newOpaque);
out_arg = cv::GRunArg(std::move(newOpaque));
// OpaqueRef is implicitly shared so no pointer is taken here
// FIXME: that variant MOVE problem again
const auto &rr = cv::util::get<cv::detail::OpaqueRef>(out_arg);
ret_val = cv::GRunArgP(rr);
}
break;
default:
cv::util::throw_error(std::logic_error("Unsupported GShape"));
}
m_postIdx[cv::gimpl::proto::ptr(ret_val)] = std::make_pair(idx, iter);
return ret_val;
}
virtual void post(cv::GRunArgP&& argp) override
{
// Mark the output ready for posting. If it is the first in the line,
// actually post it and all its successors which are ready for posting too.
auto it = m_postIdx.find(cv::gimpl::proto::ptr(argp));
GAPI_Assert(it != m_postIdx.end());
const int out_idx = it->second.first;
const auto out_iter = it->second.second;
out_iter->ready = true;
m_postIdx.erase(it); // Drop the link from the cache anyway
if (out_iter != m_postings[out_idx].begin())
{
return; // There are some pending postings in the beginning, return
}
GAPI_Assert(out_iter == m_postings[out_idx].begin());
auto post_iter = m_postings[out_idx].begin();
while (post_iter != m_postings[out_idx].end() && post_iter->ready == true)
{
Cmd cmd;
if (cv::util::holds_alternative<cv::GRunArg>(post_iter->data))
{
// FIXME: That ugly VARIANT problem
cmd = Cmd{const_cast<const cv::GRunArg&>(cv::util::get<cv::GRunArg>(post_iter->data))};
}
else
{
GAPI_Assert(cv::util::holds_alternative<cv::gimpl::EndOfStream>(post_iter->data));
cmd = Cmd{Stop{}};
m_stops_sent++;
}
for (auto &&q : m_out_queues[out_idx])
{
// FIXME: This ugly VARIANT problem
q->push(const_cast<const Cmd&>(cmd));
}
post_iter = m_postings[out_idx].erase(post_iter);
}
}
virtual void post(cv::gimpl::EndOfStream&&) override
{
// If the posting list is empty, just broadcast the stop message.
// If it is not, enqueue the Stop message in the postings list.
for (auto &&it : ade::util::indexed(m_postings))
{
const auto idx = ade::util::index(it);
auto &lst = ade::util::value(it);
if (lst.empty())
{
for (auto &&q : m_out_queues[idx])
{
q->push(Cmd(Stop{}));
}
m_stops_sent++;
}
else
{
Posting p;
p.data = Posting::V{cv::gimpl::EndOfStream{}};
p.ready = true;
lst.push_back(std::move(p)); // FIXME: For some reason {}-ctor didn't work here
}
}
}
public:
explicit StreamingOutput(const cv::GMetaArgs &metas,
std::vector< std::vector<Q*> > &out_queues,
const std::vector<cv::gimpl::RcDesc> &out_descs)
: m_metas(metas)
, m_out_queues(out_queues)
{
set(out_descs);
m_postings.resize(out_descs.size());
}
bool done() const
{
// The streaming actor work is considered DONE for this stream
// when it posted/resent all STOP messages to all its outputs.
return m_stops_sent == desc().size();
}
};
// This thread is a plain dumb processing actor. What it do is just:
// - Reads input from the input queue(s), sleeps if there's nothing to read
// - Once a full input vector is obtained, passes it to the underlying island
@ -380,133 +584,11 @@ void islandActorThread(std::vector<cv::gimpl::RcDesc> in_rcs, //
GAPI_Assert(out_queues.size() == out_rcs.size());
GAPI_Assert(out_queues.size() == out_metas.size());
QueueReader qr;
while (true)
StreamingInput input(qr, in_queues, in_constants, in_rcs);
StreamingOutput output(out_metas, out_queues, out_rcs);
while (!output.done())
{
std::vector<cv::gimpl::GIslandExecutable::InObj> isl_inputs;
isl_inputs.resize(in_rcs.size());
cv::GRunArgs isl_input_args;
if (!qr.getInputVector(in_queues, in_constants, isl_input_args))
{
// Stop received -- broadcast Stop down to the pipeline and quit
for (auto &&out_qq : out_queues)
{
for (auto &&out_q : out_qq) out_q->push(Cmd{Stop{}});
}
return;
}
GAPI_Assert(isl_inputs.size() == isl_input_args.size());
for (auto &&it : ade::util::indexed(ade::util::zip(ade::util::toRange(in_rcs),
ade::util::toRange(isl_inputs),
ade::util::toRange(isl_input_args))))
{
const auto &value = ade::util::value(it);
const auto &in_rc = std::get<0>(value);
auto &isl_input = std::get<1>(value);
const auto &in_arg = std::get<2>(value); // FIXME: MOVE PROBLEM
isl_input.first = in_rc;
#if defined(GAPI_STANDALONE)
// Standalone mode - simply store input argument in the vector as-is
auto id = ade::util::index(it);
isl_inputs[id].second = in_arg;
#else
// Make Islands operate on own:: data types (i.e. in the same
// environment as GExecutor provides)
// This way several backends (e.g. Fluid) remain OpenCV-independent.
switch (in_arg.index()) {
case cv::GRunArg::index_of<cv::Mat>():
isl_input.second = cv::GRunArg{cv::to_own(cv::util::get<cv::Mat>(in_arg))};
break;
case cv::GRunArg::index_of<cv::Scalar>():
isl_input.second = cv::GRunArg{cv::util::get<cv::Scalar>(in_arg)};
break;
default:
isl_input.second = in_arg;
break;
}
#endif // GAPI_STANDALONE
}
// Once the vector is obtained, prepare data for island execution
// Note - we first allocate output vector via GRunArg!
// Then it is converted to a GRunArgP.
std::vector<cv::gimpl::GIslandExecutable::OutObj> isl_outputs;
cv::GRunArgs out_data;
isl_outputs.resize(out_rcs.size());
out_data.resize(out_rcs.size());
for (auto &&it : ade::util::indexed(out_rcs))
{
auto id = ade::util::index(it);
auto &r = ade::util::value(it);
#if !defined(GAPI_STANDALONE)
using MatType = cv::Mat;
using SclType = cv::Scalar;
#else
using MatType = cv::gapi::own::Mat;
using SclType = cv::Scalar;
#endif // GAPI_STANDALONE
switch (r.shape) {
// Allocate a data object based on its shape & meta, and put it into our vectors.
// Yes, first we put a cv::Mat GRunArg, and then specify _THAT_
// pointer as an output parameter - to make sure that after island completes,
// our GRunArg still has the right (up-to-date) value.
// Same applies to other types.
// FIXME: This is absolutely ugly but seem to work perfectly for its purpose.
case cv::GShape::GMAT:
{
MatType newMat;
cv::gimpl::createMat(cv::util::get<cv::GMatDesc>(out_metas[id]), newMat);
out_data[id] = cv::GRunArg(std::move(newMat));
isl_outputs[id] = { r, cv::GRunArgP(&cv::util::get<MatType>(out_data[id])) };
}
break;
case cv::GShape::GSCALAR:
{
SclType newScl;
out_data[id] = cv::GRunArg(std::move(newScl));
isl_outputs[id] = { r, cv::GRunArgP(&cv::util::get<SclType>(out_data[id])) };
}
break;
case cv::GShape::GARRAY:
{
cv::detail::VectorRef newVec;
cv::util::get<cv::detail::ConstructVec>(r.ctor)(newVec);
out_data[id] = cv::GRunArg(std::move(newVec));
// VectorRef is implicitly shared so no pointer is taken here
const auto &rr = cv::util::get<cv::detail::VectorRef>(out_data[id]); // FIXME: that variant MOVE problem again
isl_outputs[id] = { r, cv::GRunArgP(rr) };
}
break;
case cv::GShape::GOPAQUE:
{
cv::detail::OpaqueRef newOpaque;
cv::util::get<cv::detail::ConstructOpaque>(r.ctor)(newOpaque);
out_data[id] = cv::GRunArg(std::move(newOpaque));
// OpaqueRef is implicitly shared so no pointer is taken here
const auto &rr = cv::util::get<cv::detail::OpaqueRef>(out_data[id]); // FIXME: that variant MOVE problem again
isl_outputs[id] = { r, cv::GRunArgP(rr) };
}
break;
default:
cv::util::throw_error(std::logic_error("Unsupported GShape"));
break;
}
}
// Now ask Island to execute on this data
island->run(std::move(isl_inputs), std::move(isl_outputs));
// Once executed, dispatch our results down to the pipeline.
for (auto &&it : ade::util::zip(ade::util::toRange(out_queues),
ade::util::toRange(out_data)))
{
for (auto &&q : std::get<0>(it))
{
// FIXME: FATAL VARIANT ISSUE!!
const auto tmp = std::get<1>(it);
q->push(Cmd{tmp});
}
}
island->run(input, output);
}
}
@ -859,6 +941,7 @@ void cv::gimpl::GStreamingExecutor::setSource(GRunArgs &&ins)
for (auto &&out_eh : op.nh->outNodes()) {
out_queues.push_back(reader_queues(*m_island_graph, out_eh));
}
op.isl_exec->handleNewStream();
m_threads.emplace_back(islandActorThread,
op.in_objects,

@ -0,0 +1,34 @@
// This file is part of OpenCV project.
// It is subject to the license terms in the LICENSE file found in the top-level directory
// of this distribution and at http://opencv.org/license.html.
//
// Copyright (C) 2020 Intel Corporation
#include "../test_precomp.hpp"
#include "../src/api/gproto_priv.hpp"
namespace opencv_test {
template<typename T>
struct ProtoPtrTest : public ::testing::Test { using Type = T; };
using ProtoPtrTestTypes = ::testing::Types< cv::Mat
, cv::UMat
, cv::gapi::own::Mat
, cv::Scalar
, std::vector<int>
, int
>;
TYPED_TEST_CASE(ProtoPtrTest, ProtoPtrTestTypes);
TYPED_TEST(ProtoPtrTest, NonZero)
{
typename TestFixture::Type value;
const auto arg = cv::gout(value).front();
const auto ptr = cv::gimpl::proto::ptr(arg);
EXPECT_EQ(ptr, &value);
}
} // namespace opencv_test

@ -815,6 +815,10 @@ struct GAPI_Streaming_Unit: public ::testing::Test {
}
};
// FIXME: (GAPI_Streaming_Types, InputOpaque) test is missing here!
// FIXME: (GAPI_Streaming_Types, XChangeOpaque) test is missing here!
// FIXME: (GAPI_Streaming_Types, OutputOpaque) test is missing here!
TEST_F(GAPI_Streaming_Unit, TestTwoVideoSourcesFail)
{
const auto c_ptr = gapi::wip::make_src<cv::gapi::wip::GCaptureSource>(findDataFile("cv/video/768x576.avi"));

Loading…
Cancel
Save