diff --git a/modules/gapi/CMakeLists.txt b/modules/gapi/CMakeLists.txt index e30cb77e9e..2caeb02ae2 100644 --- a/modules/gapi/CMakeLists.txt +++ b/modules/gapi/CMakeLists.txt @@ -190,6 +190,9 @@ set(gapi_srcs src/backends/ov/bindings_ov.cpp src/backends/python/gpythonbackend.cpp + # Queue Streaming source + src/streaming/queue_source.cpp + # OpenVPL Streaming source src/streaming/onevpl/source.cpp src/streaming/onevpl/source_priv.cpp diff --git a/modules/gapi/include/opencv2/gapi/gtype_traits.hpp b/modules/gapi/include/opencv2/gapi/gtype_traits.hpp index b56175788f..a1703a52cb 100644 --- a/modules/gapi/include/opencv2/gapi/gtype_traits.hpp +++ b/modules/gapi/include/opencv2/gapi/gtype_traits.hpp @@ -141,8 +141,10 @@ namespace detail template struct GTypeOf > { using type = cv::GArray; }; template struct GTypeOf { using type = cv::GOpaque;}; template<> struct GTypeOf { using type = cv::GFrame; }; - // FIXME: This is not quite correct since IStreamSource may produce not only Mat but also Scalar - // and vector data. TODO: Extend the type dispatching on these types too. + + // FIXME: This is not quite correct since IStreamSource may + // produce not only Mat but also MediaFrame, Scalar and vector + // data. TODO: Extend the type dispatching on these types too. template<> struct GTypeOf { using type = cv::GMat;}; template using g_type_of_t = typename GTypeOf::type; diff --git a/modules/gapi/include/opencv2/gapi/streaming/queue_source.hpp b/modules/gapi/include/opencv2/gapi/streaming/queue_source.hpp new file mode 100644 index 0000000000..bd385ed16e --- /dev/null +++ b/modules/gapi/include/opencv2/gapi/streaming/queue_source.hpp @@ -0,0 +1,67 @@ +// This file is part of OpenCV project. +// It is subject to the license terms in the LICENSE file found in the top-level directory +// of this distribution and at http://opencv.org/license.html. +// +// Copyright (C) 2023 Intel Corporation + +#ifndef OPENCV_GAPI_STREAMING_QUEUE_SOURCE_HPP +#define OPENCV_GAPI_STREAMING_QUEUE_SOURCE_HPP + +#include // shared_ptr +#include // is_base_of + +#include // GRunArgs +#include // GMetaArg + all descr_of +#include // IStreamSource + +namespace cv { +namespace gapi { +namespace wip { +struct Data; // fwd-declare to avoid circular? header dependencies + +class GAPI_EXPORTS QueueSourceBase: public cv::gapi::wip::IStreamSource { + class Priv; + std::shared_ptr m_priv; + // FIXME: Need to understand how it works with IStreamSource's shared_from_this + // Can we avoid having too many shared_ptrs here? + +public: + explicit QueueSourceBase(const cv::GMetaArg &m); + void push(Data &&data); + virtual bool pull(Data &data) override; + virtual void halt() override; + virtual GMetaArg descr_of() const override; + virtual ~QueueSourceBase() = default; +}; + +/** + * @brief Queued streaming pipeline source. + * + */ +template +class QueueSource final: public QueueSourceBase +{ +public: + using Meta = decltype(cv::descr_of(T{})); + explicit QueueSource(Meta m) : QueueSourceBase(GMetaArg{m}) { + } + void push(T t) { + QueueSourceBase::push(Data{t}); + } +}; + +class GAPI_EXPORTS QueueInput { + std::vector > m_sources; + +public: + explicit QueueInput(const cv::GMetaArgs &args); + + void push(cv::GRunArgs &&ins); + operator cv::GRunArgs(); +}; + +} // namespace wip +} // namespace gapi +} // namespace cv + +#endif // OPENCV_GAPI_STREAMING_SOURCE_HPP diff --git a/modules/gapi/include/opencv2/gapi/streaming/source.hpp b/modules/gapi/include/opencv2/gapi/streaming/source.hpp index 6597cad8f8..267469ad1b 100644 --- a/modules/gapi/include/opencv2/gapi/streaming/source.hpp +++ b/modules/gapi/include/opencv2/gapi/streaming/source.hpp @@ -16,7 +16,7 @@ namespace cv { namespace gapi { namespace wip { - struct Data; // "forward-declaration" of GRunArg +struct Data; // forward-declaration of Data to avoid circular dependencies /** * @brief Abstract streaming pipeline source. @@ -43,6 +43,11 @@ public: Ptr ptr() { return shared_from_this(); } virtual bool pull(Data &data) = 0; virtual GMetaArg descr_of() const = 0; + virtual void halt() { + // Do nothing by default to maintain compatibility with the existing sources... + // In fact needs to be decorated atop of the child classes to maintain the behavior + // FIXME: Make it mandatory in OpenCV 5.0 + }; virtual ~IStreamSource() = default; }; diff --git a/modules/gapi/src/compiler/gislandmodel.hpp b/modules/gapi/src/compiler/gislandmodel.hpp index 3a1a8d5ab9..ade13a6f33 100644 --- a/modules/gapi/src/compiler/gislandmodel.hpp +++ b/modules/gapi/src/compiler/gislandmodel.hpp @@ -192,6 +192,7 @@ class GIslandEmitter public: // Obtain next value from the emitter virtual bool pull(GRunArg &) = 0; + virtual void halt() = 0; virtual ~GIslandEmitter() = default; }; diff --git a/modules/gapi/src/executor/gstreamingexecutor.cpp b/modules/gapi/src/executor/gstreamingexecutor.cpp index 124b27f39c..6a397faca6 100644 --- a/modules/gapi/src/executor/gstreamingexecutor.cpp +++ b/modules/gapi/src/executor/gstreamingexecutor.cpp @@ -41,6 +41,10 @@ using namespace cv::gimpl::stream; class VideoEmitter final: public cv::gimpl::GIslandEmitter { cv::gapi::wip::IStreamSource::Ptr src; + virtual void halt() override { + src->halt(); + } + virtual bool pull(cv::GRunArg &arg) override { // FIXME: probably we can maintain a pool of (then) pre-allocated // buffers to avoid runtime allocations. @@ -62,6 +66,10 @@ public: class ConstEmitter final: public cv::gimpl::GIslandEmitter { cv::GRunArg m_arg; + virtual void halt() override { + // Not used here, but in fact can be used. + } + virtual bool pull(cv::GRunArg &arg) override { arg = const_cast(m_arg); // FIXME: variant workaround return true; @@ -1918,6 +1926,11 @@ void cv::gimpl::GStreamingExecutor::stop() for (auto &q : m_emitter_queues) { q.push(stream::Cmd{stream::Stop{}}); } + // Also kindly ask emitter object to halt to break the blocking src->pull() + // loop + for (auto &nh : m_emitters) { + m_gim.metadata(nh).get().object->halt(); + } // Pull messages from the final queue to ensure completion Cmd cmd; diff --git a/modules/gapi/src/streaming/queue_source.cpp b/modules/gapi/src/streaming/queue_source.cpp new file mode 100644 index 0000000000..59fde09c44 --- /dev/null +++ b/modules/gapi/src/streaming/queue_source.cpp @@ -0,0 +1,98 @@ +// This file is part of OpenCV project. +// It is subject to the license terms in the LICENSE file found in the top-level directory +// of this distribution and at http://opencv.org/license.html. +// +// Copyright (C) 2023 Intel Corporation + +#include +#include + +#include + +#include +#include + +#include "executor/conc_queue.hpp" + +namespace cv { +namespace gapi { +namespace wip { + +class QueueSourceBase::Priv { +public: + explicit Priv(const cv::GMetaArg &meta) { + m = meta; + halted = false; + } + + cv::GMetaArg m; + cv::gapi::own::concurrent_bounded_queue q; + int64_t c = 0; + std::atomic halted; +}; + +QueueSourceBase::QueueSourceBase(const cv::GMetaArg &m) + : m_priv(new Priv(m)) { +} + +void QueueSourceBase::push(Data &&data) { + + // Tag data with seq_id/ts + const auto now = std::chrono::system_clock::now(); + const auto dur = std::chrono::duration_cast + (now.time_since_epoch()); + data.meta[cv::gapi::streaming::meta_tag::timestamp] = int64_t{dur.count()}; + data.meta[cv::gapi::streaming::meta_tag::seq_id] = int64_t{m_priv->c++}; + + m_priv->q.push(data); +} + +bool QueueSourceBase::pull(Data &data) { + m_priv->q.pop(data); + + if (m_priv->halted) { + return false; + } + return true; +} + +void QueueSourceBase::halt() { + m_priv->halted.store(true); + m_priv->q.push(cv::GRunArg{}); +} + +cv::GMetaArg QueueSourceBase::descr_of() const { + return m_priv->m; +} + +QueueInput::QueueInput(const cv::GMetaArgs &args) { + for (auto &&m : args) { + m_sources.emplace_back(new cv::gapi::wip::QueueSourceBase(m)); + } +} + +void QueueInput::push(cv::GRunArgs &&args) { + GAPI_Assert(m_sources.size() == args.size()); + for (auto && it : ade::util::zip(ade::util::toRange(m_sources), + ade::util::toRange(args))) + { + auto &src = std::get<0>(it); + auto &obj = std::get<1>(it); + + Data d; + d = obj; + src->push(std::move(d)); + } +} + +QueueInput::operator cv::GRunArgs () { + cv::GRunArgs args; + for (auto &&s : m_sources) { + args.push_back(s->ptr()); + } + return args; +} + +} // wip +} // gapi +} // cv diff --git a/modules/gapi/test/streaming/gapi_streaming_queue_source_tests.cpp b/modules/gapi/test/streaming/gapi_streaming_queue_source_tests.cpp new file mode 100644 index 0000000000..093e654715 --- /dev/null +++ b/modules/gapi/test/streaming/gapi_streaming_queue_source_tests.cpp @@ -0,0 +1,127 @@ +// This file is part of OpenCV project. +// It is subject to the license terms in the LICENSE file found in the top-level directory +// of this distribution and at http://opencv.org/license.html. +// +// Copyright (C) 2023 Intel Corporation + + +#include "../test_precomp.hpp" + +#include +#include +#include + +namespace opencv_test +{ + +TEST(GAPI_Streaming_Queue_Source, SmokeTest) { + // This is more like an example on G-API Queue Source + + cv::GMat in; + cv::GMat out = in + 1; + cv::GStreamingCompiled comp = cv::GComputation(in, out).compileStreaming(); + + // Queue source needs to know format information to maintain contracts + auto src = std::make_shared > + (cv::GMatDesc{CV_8U, 1, cv::Size{128, 128}}); + + comp.setSource(cv::gin(src->ptr())); + comp.start(); + + // It is perfectly legal to start a pipeline at this point - the source was passed. + // Now we can push data through the source and get the pipeline results. + + cv::Mat eye = cv::Mat::eye(cv::Size{128, 128}, CV_8UC1); + src->push(eye); // Push I (identity matrix) + src->push(eye*2); // Push I*2 + + // Now its time to pop. The data could be already processed at this point. + // Note the queue source queues are unbounded to avoid deadlocks + + cv::Mat result; + ASSERT_TRUE(comp.pull(cv::gout(result))); + EXPECT_EQ(0, cvtest::norm(eye + 1, result, NORM_INF)); + + ASSERT_TRUE(comp.pull(cv::gout(result))); + EXPECT_EQ(0, cvtest::norm(eye*2 + 1, result, NORM_INF)); +} + +TEST(GAPI_Streaming_Queue_Source, Mixed) { + // Mixing a regular "live" source (which runs on its own) with a + // manually controlled queue source may make a little sense, but + // is perfectly legal and possible. + + cv::GMat in1; + cv::GMat in2; + cv::GMat out = in2 - in1; + cv::GStreamingCompiled comp = cv::GComputation(in1, in2, out).compileStreaming(); + + // Queue source needs to know format information to maintain contracts + auto src1 = std::make_shared > + (cv::GMatDesc{CV_8U, 3, cv::Size{768, 576}}); + + std::shared_ptr src2; + auto path = findDataFile("cv/video/768x576.avi"); + try { + src2 = cv::gapi::wip::make_src(path); + } catch(...) { + throw SkipTestException("Video file can not be opened"); + } + + comp.setSource(cv::gin(src1->ptr(), src2)); // FIXME: quite inconsistent + comp.start(); + + cv::Mat eye = cv::Mat::eye(cv::Size{768, 576}, CV_8UC3); + src1->push(eye); // Push I (identity matrix) + src1->push(eye); // Push I (again) + + cv::Mat ref, result; + cv::VideoCapture cap(path); + + cap >> ref; + ASSERT_TRUE(comp.pull(cv::gout(result))); + EXPECT_EQ(0, cvtest::norm(ref - eye, result, NORM_INF)); + + cap >> ref; + ASSERT_TRUE(comp.pull(cv::gout(result))); + EXPECT_EQ(0, cvtest::norm(ref - eye, result, NORM_INF)); +} + +TEST(GAPI_Streaming_Queue_Input, SmokeTest) { + + // Queue Input: a tiny wrapper atop of multiple queue sources. + // Allows users to pass all input data at once. + + cv::GMat in1; + cv::GScalar in2; + cv::GMat out = in1 + in2; + cv::GStreamingCompiled comp = cv::GComputation(cv::GIn(in1, in2), cv::GOut(out)) + .compileStreaming(); + + // FIXME: This API is too raw + cv::gapi::wip::QueueInput input({ + cv::GMetaArg{ cv::GMatDesc{CV_8U, 1, cv::Size{64,64} } }, + cv::GMetaArg{ cv::empty_scalar_desc() } + }); + comp.setSource(input); // Implicit conversion allows it to be passed as-is. + comp.start(); + + // Push data via queue input + cv::Mat eye = cv::Mat::eye(cv::Size{64, 64}, CV_8UC1); + input.push(cv::gin(eye, cv::Scalar(1))); + input.push(cv::gin(eye, cv::Scalar(2))); + input.push(cv::gin(eye, cv::Scalar(3))); + + // Pop data and validate + cv::Mat result; + ASSERT_TRUE(comp.pull(cv::gout(result))); + EXPECT_EQ(0, cvtest::norm(eye+1, result, NORM_INF)); + + ASSERT_TRUE(comp.pull(cv::gout(result))); + EXPECT_EQ(0, cvtest::norm(eye+2, result, NORM_INF)); + + ASSERT_TRUE(comp.pull(cv::gout(result))); + EXPECT_EQ(0, cvtest::norm(eye+3, result, NORM_INF)); +} + +} // namespace opencv_test