Merge pull request #24178 from dmatveev:dm/streaming_queue

G-API: Introduce a Queue Source #24178

- Added a new IStreamSource class: in fact, a wrapper over a concurrent queue;
- Added minimal example on how it can be used;
- Extended IStreamSource with optional "halt" interface to break the blocking calls in the emitter threads when required to stop.
- Introduced a QueueInput class which allows to pass the whole graph's input vector at once. In fact it is a thin wrapper atop of individual Queue Sources.

There is a hidden trap found with our type system as described in https://github.com/orgs/g-api-org/discussions/2

While it works even in this form, it should be addressed somewhere in the 5.0 timeframe.

### Pull Request Readiness Checklist

See details at https://github.com/opencv/opencv/wiki/How_to_contribute#making-a-good-pull-request

- [x] I agree to contribute to the project under Apache 2 License.
- [x] To the best of my knowledge, the proposed patch is not based on a code under GPL or another license that is incompatible with OpenCV
- [x] The PR is proposed to the proper branch
- [ ] There is a reference to the original bug report and related work
- [x] There is accuracy test, performance test and test data in opencv_extra repository, if applicable
      Patch to opencv_extra has the same branch name.
- [x] The feature is well documented and sample code can be built with the project CMake
pull/24221/head
Dmitry Matveev 1 year ago committed by GitHub
parent d3bccd7b23
commit d19fc1264b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 3
      modules/gapi/CMakeLists.txt
  2. 6
      modules/gapi/include/opencv2/gapi/gtype_traits.hpp
  3. 67
      modules/gapi/include/opencv2/gapi/streaming/queue_source.hpp
  4. 7
      modules/gapi/include/opencv2/gapi/streaming/source.hpp
  5. 1
      modules/gapi/src/compiler/gislandmodel.hpp
  6. 13
      modules/gapi/src/executor/gstreamingexecutor.cpp
  7. 98
      modules/gapi/src/streaming/queue_source.cpp
  8. 127
      modules/gapi/test/streaming/gapi_streaming_queue_source_tests.cpp

@ -190,6 +190,9 @@ set(gapi_srcs
src/backends/ov/bindings_ov.cpp
src/backends/python/gpythonbackend.cpp
# Queue Streaming source
src/streaming/queue_source.cpp
# OpenVPL Streaming source
src/streaming/onevpl/source.cpp
src/streaming/onevpl/source_priv.cpp

@ -141,8 +141,10 @@ namespace detail
template<typename U> struct GTypeOf<std::vector<U> > { using type = cv::GArray<U>; };
template<typename U> struct GTypeOf { using type = cv::GOpaque<U>;};
template<> struct GTypeOf<cv::MediaFrame> { using type = cv::GFrame; };
// FIXME: This is not quite correct since IStreamSource may produce not only Mat but also Scalar
// and vector data. TODO: Extend the type dispatching on these types too.
// FIXME: This is not quite correct since IStreamSource may
// produce not only Mat but also MediaFrame, Scalar and vector
// data. TODO: Extend the type dispatching on these types too.
template<> struct GTypeOf<cv::gapi::wip::IStreamSource::Ptr> { using type = cv::GMat;};
template<class T> using g_type_of_t = typename GTypeOf<T>::type;

@ -0,0 +1,67 @@
// This file is part of OpenCV project.
// It is subject to the license terms in the LICENSE file found in the top-level directory
// of this distribution and at http://opencv.org/license.html.
//
// Copyright (C) 2023 Intel Corporation
#ifndef OPENCV_GAPI_STREAMING_QUEUE_SOURCE_HPP
#define OPENCV_GAPI_STREAMING_QUEUE_SOURCE_HPP
#include <memory> // shared_ptr
#include <type_traits> // is_base_of
#include <opencv2/gapi/garg.hpp> // GRunArgs
#include <opencv2/gapi/gmetaarg.hpp> // GMetaArg + all descr_of
#include <opencv2/gapi/streaming/source.hpp> // IStreamSource
namespace cv {
namespace gapi {
namespace wip {
struct Data; // fwd-declare to avoid circular? header dependencies
class GAPI_EXPORTS QueueSourceBase: public cv::gapi::wip::IStreamSource {
class Priv;
std::shared_ptr<Priv> m_priv;
// FIXME: Need to understand how it works with IStreamSource's shared_from_this
// Can we avoid having too many shared_ptrs here?
public:
explicit QueueSourceBase(const cv::GMetaArg &m);
void push(Data &&data);
virtual bool pull(Data &data) override;
virtual void halt() override;
virtual GMetaArg descr_of() const override;
virtual ~QueueSourceBase() = default;
};
/**
* @brief Queued streaming pipeline source.
*
*/
template<class T>
class QueueSource final: public QueueSourceBase
{
public:
using Meta = decltype(cv::descr_of(T{}));
explicit QueueSource(Meta m) : QueueSourceBase(GMetaArg{m}) {
}
void push(T t) {
QueueSourceBase::push(Data{t});
}
};
class GAPI_EXPORTS QueueInput {
std::vector<std::shared_ptr<QueueSourceBase> > m_sources;
public:
explicit QueueInput(const cv::GMetaArgs &args);
void push(cv::GRunArgs &&ins);
operator cv::GRunArgs();
};
} // namespace wip
} // namespace gapi
} // namespace cv
#endif // OPENCV_GAPI_STREAMING_SOURCE_HPP

@ -16,7 +16,7 @@
namespace cv {
namespace gapi {
namespace wip {
struct Data; // "forward-declaration" of GRunArg
struct Data; // forward-declaration of Data to avoid circular dependencies
/**
* @brief Abstract streaming pipeline source.
@ -43,6 +43,11 @@ public:
Ptr ptr() { return shared_from_this(); }
virtual bool pull(Data &data) = 0;
virtual GMetaArg descr_of() const = 0;
virtual void halt() {
// Do nothing by default to maintain compatibility with the existing sources...
// In fact needs to be decorated atop of the child classes to maintain the behavior
// FIXME: Make it mandatory in OpenCV 5.0
};
virtual ~IStreamSource() = default;
};

@ -192,6 +192,7 @@ class GIslandEmitter
public:
// Obtain next value from the emitter
virtual bool pull(GRunArg &) = 0;
virtual void halt() = 0;
virtual ~GIslandEmitter() = default;
};

@ -41,6 +41,10 @@ using namespace cv::gimpl::stream;
class VideoEmitter final: public cv::gimpl::GIslandEmitter {
cv::gapi::wip::IStreamSource::Ptr src;
virtual void halt() override {
src->halt();
}
virtual bool pull(cv::GRunArg &arg) override {
// FIXME: probably we can maintain a pool of (then) pre-allocated
// buffers to avoid runtime allocations.
@ -62,6 +66,10 @@ public:
class ConstEmitter final: public cv::gimpl::GIslandEmitter {
cv::GRunArg m_arg;
virtual void halt() override {
// Not used here, but in fact can be used.
}
virtual bool pull(cv::GRunArg &arg) override {
arg = const_cast<const cv::GRunArg&>(m_arg); // FIXME: variant workaround
return true;
@ -1918,6 +1926,11 @@ void cv::gimpl::GStreamingExecutor::stop()
for (auto &q : m_emitter_queues) {
q.push(stream::Cmd{stream::Stop{}});
}
// Also kindly ask emitter object to halt to break the blocking src->pull()
// loop
for (auto &nh : m_emitters) {
m_gim.metadata(nh).get<Emitter>().object->halt();
}
// Pull messages from the final queue to ensure completion
Cmd cmd;

@ -0,0 +1,98 @@
// This file is part of OpenCV project.
// It is subject to the license terms in the LICENSE file found in the top-level directory
// of this distribution and at http://opencv.org/license.html.
//
// Copyright (C) 2023 Intel Corporation
#include <chrono>
#include <atomic>
#include <ade/util/zip_range.hpp>
#include <opencv2/gapi/streaming/queue_source.hpp>
#include <opencv2/gapi/streaming/meta.hpp>
#include "executor/conc_queue.hpp"
namespace cv {
namespace gapi {
namespace wip {
class QueueSourceBase::Priv {
public:
explicit Priv(const cv::GMetaArg &meta) {
m = meta;
halted = false;
}
cv::GMetaArg m;
cv::gapi::own::concurrent_bounded_queue<cv::GRunArg> q;
int64_t c = 0;
std::atomic<bool> halted;
};
QueueSourceBase::QueueSourceBase(const cv::GMetaArg &m)
: m_priv(new Priv(m)) {
}
void QueueSourceBase::push(Data &&data) {
// Tag data with seq_id/ts
const auto now = std::chrono::system_clock::now();
const auto dur = std::chrono::duration_cast<std::chrono::microseconds>
(now.time_since_epoch());
data.meta[cv::gapi::streaming::meta_tag::timestamp] = int64_t{dur.count()};
data.meta[cv::gapi::streaming::meta_tag::seq_id] = int64_t{m_priv->c++};
m_priv->q.push(data);
}
bool QueueSourceBase::pull(Data &data) {
m_priv->q.pop(data);
if (m_priv->halted) {
return false;
}
return true;
}
void QueueSourceBase::halt() {
m_priv->halted.store(true);
m_priv->q.push(cv::GRunArg{});
}
cv::GMetaArg QueueSourceBase::descr_of() const {
return m_priv->m;
}
QueueInput::QueueInput(const cv::GMetaArgs &args) {
for (auto &&m : args) {
m_sources.emplace_back(new cv::gapi::wip::QueueSourceBase(m));
}
}
void QueueInput::push(cv::GRunArgs &&args) {
GAPI_Assert(m_sources.size() == args.size());
for (auto && it : ade::util::zip(ade::util::toRange(m_sources),
ade::util::toRange(args)))
{
auto &src = std::get<0>(it);
auto &obj = std::get<1>(it);
Data d;
d = obj;
src->push(std::move(d));
}
}
QueueInput::operator cv::GRunArgs () {
cv::GRunArgs args;
for (auto &&s : m_sources) {
args.push_back(s->ptr());
}
return args;
}
} // wip
} // gapi
} // cv

@ -0,0 +1,127 @@
// This file is part of OpenCV project.
// It is subject to the license terms in the LICENSE file found in the top-level directory
// of this distribution and at http://opencv.org/license.html.
//
// Copyright (C) 2023 Intel Corporation
#include "../test_precomp.hpp"
#include <opencv2/gapi/gstreaming.hpp>
#include <opencv2/gapi/streaming/queue_source.hpp>
#include <opencv2/gapi/streaming/cap.hpp>
namespace opencv_test
{
TEST(GAPI_Streaming_Queue_Source, SmokeTest) {
// This is more like an example on G-API Queue Source
cv::GMat in;
cv::GMat out = in + 1;
cv::GStreamingCompiled comp = cv::GComputation(in, out).compileStreaming();
// Queue source needs to know format information to maintain contracts
auto src = std::make_shared<cv::gapi::wip::QueueSource<cv::Mat> >
(cv::GMatDesc{CV_8U, 1, cv::Size{128, 128}});
comp.setSource(cv::gin(src->ptr()));
comp.start();
// It is perfectly legal to start a pipeline at this point - the source was passed.
// Now we can push data through the source and get the pipeline results.
cv::Mat eye = cv::Mat::eye(cv::Size{128, 128}, CV_8UC1);
src->push(eye); // Push I (identity matrix)
src->push(eye*2); // Push I*2
// Now its time to pop. The data could be already processed at this point.
// Note the queue source queues are unbounded to avoid deadlocks
cv::Mat result;
ASSERT_TRUE(comp.pull(cv::gout(result)));
EXPECT_EQ(0, cvtest::norm(eye + 1, result, NORM_INF));
ASSERT_TRUE(comp.pull(cv::gout(result)));
EXPECT_EQ(0, cvtest::norm(eye*2 + 1, result, NORM_INF));
}
TEST(GAPI_Streaming_Queue_Source, Mixed) {
// Mixing a regular "live" source (which runs on its own) with a
// manually controlled queue source may make a little sense, but
// is perfectly legal and possible.
cv::GMat in1;
cv::GMat in2;
cv::GMat out = in2 - in1;
cv::GStreamingCompiled comp = cv::GComputation(in1, in2, out).compileStreaming();
// Queue source needs to know format information to maintain contracts
auto src1 = std::make_shared<cv::gapi::wip::QueueSource<cv::Mat> >
(cv::GMatDesc{CV_8U, 3, cv::Size{768, 576}});
std::shared_ptr<cv::gapi::wip::IStreamSource> src2;
auto path = findDataFile("cv/video/768x576.avi");
try {
src2 = cv::gapi::wip::make_src<cv::gapi::wip::GCaptureSource>(path);
} catch(...) {
throw SkipTestException("Video file can not be opened");
}
comp.setSource(cv::gin(src1->ptr(), src2)); // FIXME: quite inconsistent
comp.start();
cv::Mat eye = cv::Mat::eye(cv::Size{768, 576}, CV_8UC3);
src1->push(eye); // Push I (identity matrix)
src1->push(eye); // Push I (again)
cv::Mat ref, result;
cv::VideoCapture cap(path);
cap >> ref;
ASSERT_TRUE(comp.pull(cv::gout(result)));
EXPECT_EQ(0, cvtest::norm(ref - eye, result, NORM_INF));
cap >> ref;
ASSERT_TRUE(comp.pull(cv::gout(result)));
EXPECT_EQ(0, cvtest::norm(ref - eye, result, NORM_INF));
}
TEST(GAPI_Streaming_Queue_Input, SmokeTest) {
// Queue Input: a tiny wrapper atop of multiple queue sources.
// Allows users to pass all input data at once.
cv::GMat in1;
cv::GScalar in2;
cv::GMat out = in1 + in2;
cv::GStreamingCompiled comp = cv::GComputation(cv::GIn(in1, in2), cv::GOut(out))
.compileStreaming();
// FIXME: This API is too raw
cv::gapi::wip::QueueInput input({
cv::GMetaArg{ cv::GMatDesc{CV_8U, 1, cv::Size{64,64} } },
cv::GMetaArg{ cv::empty_scalar_desc() }
});
comp.setSource(input); // Implicit conversion allows it to be passed as-is.
comp.start();
// Push data via queue input
cv::Mat eye = cv::Mat::eye(cv::Size{64, 64}, CV_8UC1);
input.push(cv::gin(eye, cv::Scalar(1)));
input.push(cv::gin(eye, cv::Scalar(2)));
input.push(cv::gin(eye, cv::Scalar(3)));
// Pop data and validate
cv::Mat result;
ASSERT_TRUE(comp.pull(cv::gout(result)));
EXPECT_EQ(0, cvtest::norm(eye+1, result, NORM_INF));
ASSERT_TRUE(comp.pull(cv::gout(result)));
EXPECT_EQ(0, cvtest::norm(eye+2, result, NORM_INF));
ASSERT_TRUE(comp.pull(cv::gout(result)));
EXPECT_EQ(0, cvtest::norm(eye+3, result, NORM_INF));
}
} // namespace opencv_test
Loading…
Cancel
Save