Merge pull request #20709 from AsyaPronina:asyadev/integrate_gstreamer_source

Ported GStreamerSource to OpenCV

* Ported GStreamerSource to OpenCV

* Fixed CI failures

* Whitespaces

* Whitespaces + removed exception from destructors C4722

* Removed assert for Priv's getSS and descr_of

* Removed assert for pull

* Fixed last review comment

Co-authored-by: Pashchenkov Maxim <maxim.pashchenkov@intel.com>
pull/21212/head
Anastasiya(Asya) Pronina 3 years ago committed by GitHub
parent 973e1acb67
commit 8dd6882222
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 25
      modules/gapi/CMakeLists.txt
  2. 47
      modules/gapi/include/opencv2/gapi/streaming/gstreamer/gstreamerpipeline.hpp
  3. 89
      modules/gapi/include/opencv2/gapi/streaming/gstreamer/gstreamersource.hpp
  4. 2
      modules/gapi/include/opencv2/gapi/streaming/onevpl/device_selector_interface.hpp
  5. 27
      modules/gapi/src/streaming/gstreamer/gstreamer_buffer_utils.cpp
  6. 27
      modules/gapi/src/streaming/gstreamer/gstreamer_buffer_utils.hpp
  7. 122
      modules/gapi/src/streaming/gstreamer/gstreamer_media_adapter.cpp
  8. 63
      modules/gapi/src/streaming/gstreamer/gstreamer_media_adapter.hpp
  9. 314
      modules/gapi/src/streaming/gstreamer/gstreamer_pipeline_facade.cpp
  10. 89
      modules/gapi/src/streaming/gstreamer/gstreamer_pipeline_facade.hpp
  11. 90
      modules/gapi/src/streaming/gstreamer/gstreamerenv.cpp
  12. 37
      modules/gapi/src/streaming/gstreamer/gstreamerenv.hpp
  13. 112
      modules/gapi/src/streaming/gstreamer/gstreamerpipeline.cpp
  14. 58
      modules/gapi/src/streaming/gstreamer/gstreamerpipeline_priv.hpp
  15. 177
      modules/gapi/src/streaming/gstreamer/gstreamerptr.hpp
  16. 383
      modules/gapi/src/streaming/gstreamer/gstreamersource.cpp
  17. 94
      modules/gapi/src/streaming/gstreamer/gstreamersource_priv.hpp
  18. 188
      modules/gapi/test/streaming/gapi_gstreamer_pipeline_facade_int_tests.cpp
  19. 401
      modules/gapi/test/streaming/gapi_gstreamersource_tests.cpp

@ -47,12 +47,14 @@ file(GLOB gapi_ext_hdrs
"${CMAKE_CURRENT_LIST_DIR}/include/opencv2/${name}/infer/*.hpp"
"${CMAKE_CURRENT_LIST_DIR}/include/opencv2/${name}/ocl/*.hpp"
"${CMAKE_CURRENT_LIST_DIR}/include/opencv2/${name}/own/*.hpp"
"${CMAKE_CURRENT_LIST_DIR}/include/opencv2/${name}/plaidml/*.hpp"
"${CMAKE_CURRENT_LIST_DIR}/include/opencv2/${name}/python/*.hpp"
"${CMAKE_CURRENT_LIST_DIR}/include/opencv2/${name}/render/*.hpp"
"${CMAKE_CURRENT_LIST_DIR}/include/opencv2/${name}/s11n/*.hpp"
"${CMAKE_CURRENT_LIST_DIR}/include/opencv2/${name}/streaming/*.hpp"
"${CMAKE_CURRENT_LIST_DIR}/include/opencv2/${name}/plaidml/*.hpp"
"${CMAKE_CURRENT_LIST_DIR}/include/opencv2/${name}/streaming/gstreamer/*.hpp"
"${CMAKE_CURRENT_LIST_DIR}/include/opencv2/${name}/streaming/onevpl/*.hpp"
"${CMAKE_CURRENT_LIST_DIR}/include/opencv2/${name}/util/*.hpp"
"${CMAKE_CURRENT_LIST_DIR}/include/opencv2/${name}/python/*.hpp"
)
set(gapi_srcs
@ -163,7 +165,7 @@ set(gapi_srcs
src/backends/ie/bindings_ie.cpp
src/backends/python/gpythonbackend.cpp
# Streaming source
# OpenVPL Streaming source
src/streaming/onevpl/source.cpp
src/streaming/onevpl/source_priv.cpp
src/streaming/onevpl/file_data_provider.cpp
@ -186,6 +188,14 @@ set(gapi_srcs
src/streaming/onevpl/cfg_param_device_selector.cpp
src/streaming/onevpl/device_selector_interface.cpp
# GStreamer Streaming source
src/streaming/gstreamer/gstreamer_pipeline_facade.cpp
src/streaming/gstreamer/gstreamerpipeline.cpp
src/streaming/gstreamer/gstreamersource.cpp
src/streaming/gstreamer/gstreamer_buffer_utils.cpp
src/streaming/gstreamer/gstreamer_media_adapter.cpp
src/streaming/gstreamer/gstreamerenv.cpp
# Utils (ITT tracing)
src/utils/itt.cpp
)
@ -283,6 +293,15 @@ if(HAVE_GAPI_ONEVPL)
endif()
endif()
if(HAVE_GSTREAMER)
if(TARGET opencv_test_gapi)
ocv_target_compile_definitions(opencv_test_gapi PRIVATE -DHAVE_GSTREAMER)
ocv_target_link_libraries(opencv_test_gapi PRIVATE ocv.3rdparty.gstreamer)
endif()
ocv_target_compile_definitions(${the_module} PRIVATE -DHAVE_GSTREAMER)
ocv_target_link_libraries(${the_module} PRIVATE ocv.3rdparty.gstreamer)
endif()
if(WIN32)
# Required for htonl/ntohl on Windows
ocv_target_link_libraries(${the_module} PRIVATE wsock32 ws2_32)

@ -0,0 +1,47 @@
// This file is part of OpenCV project.
// It is subject to the license terms in the LICENSE file found in the top-level directory
// of this distribution and at http://opencv.org/license.html.
//
// Copyright (C) 2021 Intel Corporation
#ifndef OPENCV_GAPI_STREAMING_GSTREAMER_GSTREAMERPIPELINE_HPP
#define OPENCV_GAPI_STREAMING_GSTREAMER_GSTREAMERPIPELINE_HPP
#include <opencv2/gapi/streaming/gstreamer/gstreamersource.hpp>
#include <opencv2/gapi/own/exports.hpp>
#include <string>
#include <unordered_map>
#include <memory>
namespace cv {
namespace gapi {
namespace wip {
namespace gst {
class GAPI_EXPORTS GStreamerPipeline
{
public:
class Priv;
explicit GStreamerPipeline(const std::string& pipeline);
IStreamSource::Ptr getStreamingSource(const std::string& appsinkName,
const GStreamerSource::OutputType outputType =
GStreamerSource::OutputType::MAT);
virtual ~GStreamerPipeline();
protected:
explicit GStreamerPipeline(std::unique_ptr<Priv> priv);
std::unique_ptr<Priv> m_priv;
};
} // namespace gst
using GStreamerPipeline = gst::GStreamerPipeline;
} // namespace wip
} // namespace gapi
} // namespace cv
#endif // OPENCV_GAPI_STREAMING_GSTREAMER_GSTREAMERPIPELINE_HPP

@ -0,0 +1,89 @@
// This file is part of OpenCV project.
// It is subject to the license terms in the LICENSE file found in the top-level directory
// of this distribution and at http://opencv.org/license.html.
//
// Copyright (C) 2021 Intel Corporation
#ifndef OPENCV_GAPI_STREAMING_GSTREAMER_GSTREAMERSOURCE_HPP
#define OPENCV_GAPI_STREAMING_GSTREAMER_GSTREAMERSOURCE_HPP
#include <opencv2/gapi/streaming/source.hpp>
#include <opencv2/gapi/garg.hpp>
#include <memory>
namespace cv {
namespace gapi {
namespace wip {
namespace gst {
/**
* @brief OpenCV's GStreamer streaming source.
* Streams cv::Mat-s/cv::MediaFrame from passed GStreamer pipeline.
*
* This class implements IStreamSource interface.
*
* To create GStreamerSource instance you need to pass 'pipeline' and, optionally, 'outputType'
* arguments into constructor.
* 'pipeline' should represent GStreamer pipeline in form of textual description.
* Almost any custom pipeline is supported which can be successfully ran via gst-launch.
* The only two limitations are:
* - there should be __one__ appsink element in the pipeline to pass data to OpenCV app.
* Pipeline can actually contain many sink elements, but it must have one and only one
* appsink among them.
*
* - data passed to appsink should be video-frame in NV12 format.
*
* 'outputType' is used to select type of output data to produce: 'cv::MediaFrame' or 'cv::Mat'.
* To produce 'cv::MediaFrame'-s you need to pass 'GStreamerSource::OutputType::FRAME' and,
* correspondingly, 'GStreamerSource::OutputType::MAT' to produce 'cv::Mat'-s.
* Please note, that in the last case, output 'cv::Mat' will be of BGR format, internal conversion
* from NV12 GStreamer data will happen.
* Default value for 'outputType' is 'GStreamerSource::OutputType::MAT'.
*
* @note Stream sources are passed to G-API via shared pointers, so please use gapi::make_src<>
* to create objects and ptr() to pass a GStreamerSource to cv::gin().
*
* @note You need to build OpenCV with GStreamer support to use this class.
*/
class GStreamerPipelineFacade;
class GAPI_EXPORTS GStreamerSource : public IStreamSource
{
public:
class Priv;
// Indicates what type of data should be produced by GStreamerSource: cv::MediaFrame or cv::Mat
enum class OutputType {
FRAME,
MAT
};
GStreamerSource(const std::string& pipeline,
const GStreamerSource::OutputType outputType =
GStreamerSource::OutputType::MAT);
GStreamerSource(std::shared_ptr<GStreamerPipelineFacade> pipeline,
const std::string& appsinkName,
const GStreamerSource::OutputType outputType =
GStreamerSource::OutputType::MAT);
bool pull(cv::gapi::wip::Data& data) override;
GMetaArg descr_of() const override;
~GStreamerSource() override;
protected:
explicit GStreamerSource(std::unique_ptr<Priv> priv);
std::unique_ptr<Priv> m_priv;
};
} // namespace gst
using GStreamerSource = gst::GStreamerSource;
} // namespace wip
} // namespace gapi
} // namespace cv
#endif // OPENCV_GAPI_STREAMING_GSTREAMER_GSTREAMERSOURCE_HPP

@ -19,7 +19,7 @@ namespace gapi {
namespace wip {
namespace onevpl {
enum class AccelType : uint8_t {
enum class AccelType: uint8_t {
HOST,
DX11,

@ -0,0 +1,27 @@
// This file is part of OpenCV project.
// It is subject to the license terms in the LICENSE file found in the top-level directory
// of this distribution and at http://opencv.org/license.html.
//
// Copyright (C) 2021 Intel Corporation
#include "gstreamer_buffer_utils.hpp"
#include "gstreamerptr.hpp"
#include <opencv2/gapi/own/assert.hpp>
#ifdef HAVE_GSTREAMER
namespace cv {
namespace gapi {
namespace wip {
namespace gstreamer_utils {
void mapBufferToFrame(GstBuffer& buffer, GstVideoInfo& info, GstVideoFrame& frame,
GstMapFlags mapFlags) {
bool mapped = gst_video_frame_map(&frame, &info, &buffer, mapFlags);
GAPI_Assert(mapped && "Failed to map GStreamer buffer to system memory as video-frame!");
}
} // namespace gstreamer_utils
} // namespace wip
} // namespace gapi
} // namespace cv
#endif // HAVE_GSTREAMER

@ -0,0 +1,27 @@
// This file is part of OpenCV project.
// It is subject to the license terms in the LICENSE file found in the top-level directory
// of this distribution and at http://opencv.org/license.html.
//
// Copyright (C) 2021 Intel Corporation
#ifndef OPENCV_GAPI_STREAMING_GSTREAMER_GSTREAMER_BUFFER_UTILS_HPP
#define OPENCV_GAPI_STREAMING_GSTREAMER_GSTREAMER_BUFFER_UTILS_HPP
#ifdef HAVE_GSTREAMER
#include <gst/gstbuffer.h>
#include <gst/video/video-frame.h>
namespace cv {
namespace gapi {
namespace wip {
namespace gstreamer_utils {
void mapBufferToFrame(GstBuffer& buffer, GstVideoInfo& info, GstVideoFrame& frame,
GstMapFlags map_flags);
} // namespace gstreamer_utils
} // namespace wip
} // namespace gapi
} // namespace cv
#endif // HAVE_GSTREAMER
#endif // OPENCV_GAPI_STREAMING_GSTREAMER_GSTREAMER_BUFFER_UTILS_HPP

@ -0,0 +1,122 @@
// This file is part of OpenCV project.
// It is subject to the license terms in the LICENSE file found in the top-level directory
// of this distribution and at http://opencv.org/license.html.
//
// Copyright (C) 2021 Intel Corporation
#include "gstreamer_media_adapter.hpp"
#include "gstreamer_buffer_utils.hpp"
#ifdef HAVE_GSTREAMER
namespace cv {
namespace gapi {
namespace wip {
namespace gst {
GStreamerMediaAdapter::GStreamerMediaAdapter(const cv::GFrameDesc& frameDesc,
GstVideoInfo* videoInfo,
GstBuffer* buffer) :
m_frameDesc(frameDesc),
m_videoInfo(gst_video_info_copy(videoInfo)),
m_buffer(gst_buffer_ref(buffer)),
m_isMapped(false)
{
#if GST_VERSION_MINOR >= 10
// Check that GstBuffer has mono-view, so we can retrieve only one video-meta
GAPI_Assert((gst_buffer_get_flags(m_buffer) & GST_VIDEO_BUFFER_FLAG_MULTIPLE_VIEW) == 0);
#endif // GST_VERSION_MINOR >= 10
GstVideoMeta* videoMeta = gst_buffer_get_video_meta(m_buffer);
if (videoMeta != nullptr) {
m_strides = { videoMeta->stride[0], videoMeta->stride[1] };
m_offsets = { videoMeta->offset[0], videoMeta->offset[1] };
} else {
m_strides = { GST_VIDEO_INFO_PLANE_STRIDE(m_videoInfo.get(), 0),
GST_VIDEO_INFO_PLANE_STRIDE(m_videoInfo.get(), 1) };
m_offsets = { GST_VIDEO_INFO_PLANE_OFFSET(m_videoInfo.get(), 0),
GST_VIDEO_INFO_PLANE_OFFSET(m_videoInfo.get(), 1) };
}
}
GStreamerMediaAdapter::~GStreamerMediaAdapter() {
if (m_isMapped.load(std::memory_order_acquire)) {
gst_video_frame_unmap(&m_videoFrame);
m_isMapped.store(false, std::memory_order_release);
m_mappedForWrite.store(false);
}
}
cv::GFrameDesc GStreamerMediaAdapter::meta() const {
return m_frameDesc;
}
cv::MediaFrame::View GStreamerMediaAdapter::access(cv::MediaFrame::Access access) {
GAPI_Assert(access == cv::MediaFrame::Access::R ||
access == cv::MediaFrame::Access::W);
static std::atomic<size_t> thread_counters { };
++thread_counters;
// NOTE: Framework guarantees that there should be no parallel accesses to the frame
// memory if is accessing for write.
if (access == cv::MediaFrame::Access::W && !m_mappedForWrite.load(std::memory_order_acquire)) {
GAPI_Assert(thread_counters > 1 &&
"Multiple access to view during mapping for write detected!");
gst_video_frame_unmap(&m_videoFrame);
m_isMapped.store(false);
}
if (!m_isMapped.load(std::memory_order_acquire)) {
std::lock_guard<std::mutex> lock(m_mutex);
if(!m_isMapped.load(std::memory_order_relaxed)) {
GAPI_Assert(GST_VIDEO_INFO_N_PLANES(m_videoInfo.get()) == 2);
GAPI_Assert(GST_VIDEO_INFO_FORMAT(m_videoInfo.get()) == GST_VIDEO_FORMAT_NV12);
// TODO: Use RAII for map/unmap
if (access == cv::MediaFrame::Access::W) {
gstreamer_utils::mapBufferToFrame(*m_buffer, *m_videoInfo, m_videoFrame,
GST_MAP_WRITE);
m_mappedForWrite.store(true, std::memory_order_release);
} else {
gstreamer_utils::mapBufferToFrame(*m_buffer, *m_videoInfo, m_videoFrame,
GST_MAP_READ);
}
GAPI_Assert(GST_VIDEO_FRAME_PLANE_STRIDE(&m_videoFrame, 0) == m_strides[0]);
GAPI_Assert(GST_VIDEO_FRAME_PLANE_STRIDE(&m_videoFrame, 1) == m_strides[1]);
GAPI_Assert(GST_VIDEO_FRAME_PLANE_OFFSET(&m_videoFrame, 0) == m_offsets[0]);
GAPI_Assert(GST_VIDEO_FRAME_PLANE_OFFSET(&m_videoFrame, 1) == m_offsets[1]);
m_isMapped.store(true, std::memory_order_release);
}
}
cv::MediaFrame::View::Ptrs ps {
static_cast<uint8_t*>(GST_VIDEO_FRAME_PLANE_DATA(&m_videoFrame, 0)) + m_offsets[0], // Y-plane
static_cast<uint8_t*>(GST_VIDEO_FRAME_PLANE_DATA(&m_videoFrame, 0)) + m_offsets[1], // UV-plane
nullptr,
nullptr
};
cv::MediaFrame::View::Strides ss = {
static_cast<std::size_t>(m_strides[0]), // Y-plane stride
static_cast<std::size_t>(m_strides[1]), // UV-plane stride
0u,
0u
};
--thread_counters;
return cv::MediaFrame::View(std::move(ps), std::move(ss));
}
cv::util::any GStreamerMediaAdapter::blobParams() const {
GAPI_Assert(false && "No implementation for GStreamerMediaAdapter::blobParams()");
}
} // namespace gst
} // namespace wip
} // namespace gapi
} // namespace cv
#endif // HAVE_GSTREAMER

@ -0,0 +1,63 @@
// This file is part of OpenCV project.
// It is subject to the license terms in the LICENSE file found in the top-level directory
// of this distribution and at http://opencv.org/license.html.
//
// Copyright (C) 2021 Intel Corporation
#ifndef OPENCV_GAPI_STREAMING_GSTREAMER_GSTREAMER_MEDIA_ADAPTER_HPP
#define OPENCV_GAPI_STREAMING_GSTREAMER_GSTREAMER_MEDIA_ADAPTER_HPP
// #include <opencv2/gapi/garray.hpp>
// #include <opencv2/gapi/streaming/meta.hpp>
#include "gstreamerptr.hpp"
#include <opencv2/gapi/streaming/gstreamer/gstreamersource.hpp>
#include <atomic>
#include <mutex>
#ifdef HAVE_GSTREAMER
#include <gst/gstbuffer.h>
#include <gst/video/video-frame.h>
namespace cv {
namespace gapi {
namespace wip {
namespace gst {
class GStreamerMediaAdapter : public cv::MediaFrame::IAdapter {
public:
explicit GStreamerMediaAdapter(const cv::GFrameDesc& frameDesc,
GstVideoInfo* videoInfo,
GstBuffer* buffer);
~GStreamerMediaAdapter() override;
virtual cv::GFrameDesc meta() const override;
cv::MediaFrame::View access(cv::MediaFrame::Access access) override;
cv::util::any blobParams() const override;
protected:
cv::GFrameDesc m_frameDesc;
GStreamerPtr<GstVideoInfo> m_videoInfo;
GStreamerPtr<GstBuffer> m_buffer;
std::vector<gint> m_strides;
std::vector<gsize> m_offsets;
GstVideoFrame m_videoFrame;
std::atomic<bool> m_isMapped;
std::atomic<bool> m_mappedForWrite;
std::mutex m_mutex;
};
} // namespace gst
} // namespace wip
} // namespace gapi
} // namespace cv
#endif // HAVE_GSTREAMER
#endif // OPENCV_GAPI_STREAMING_GSTREAMER_GSTREAMER_MEDIA_ADAPTER_HPP

@ -0,0 +1,314 @@
// This file is part of OpenCV project.
// It is subject to the license terms in the LICENSE file found in the top-level directory
// of this distribution and at http://opencv.org/license.html.
//
// Copyright (C) 2021 Intel Corporation
#include "gstreamerenv.hpp"
#include "gstreamer_pipeline_facade.hpp"
#include <opencv2/gapi/streaming/meta.hpp>
#include <logger.hpp>
#include <opencv2/imgproc.hpp>
#ifdef HAVE_GSTREAMER
#include <gst/app/gstappsink.h>
#include <gst/gstbuffer.h>
#include <gst/video/video-frame.h>
#include <gst/pbutils/missing-plugins.h>
namespace cv {
namespace gapi {
namespace wip {
namespace gst {
GStreamerPipelineFacade::GStreamerPipelineFacade():
m_isPrerolled(false),
m_isPlaying(false)
{ }
GStreamerPipelineFacade::GStreamerPipelineFacade(const std::string& pipelineDesc):
GStreamerPipelineFacade()
{
m_pipelineDesc = pipelineDesc;
// Initialize GStreamer library:
GStreamerEnv::init();
// Create GStreamer pipeline:
GError* error = NULL;
// gst_parse_launch [transfer floating]
m_pipeline = GST_ELEMENT(g_object_ref_sink(
gst_parse_launch(m_pipelineDesc.c_str(), &error)));
GStreamerPtr<GError> err(error);
if (err)
{
cv::util::throw_error(
std::runtime_error("Error in parsing pipeline: " + std::string(err->message)));
}
}
// The destructors are noexcept by default (since C++11).
GStreamerPipelineFacade::~GStreamerPipelineFacade()
{
// There is no mutex acquisition here, because we assume that no one will call this method
// directly.
// Destructor may be called on empty GStreamerSource object in case if
// exception is thrown during construction.
if (m_pipeline && GST_IS_ELEMENT(m_pipeline.get()))
{
try
{
setState(GST_STATE_NULL);
}
catch(...)
{
GAPI_LOG_WARNING(NULL, "Unable to stop pipeline in destructor.\n");
}
m_pipeline.release();
}
}
std::vector<GstElement*> GStreamerPipelineFacade::getElementsByFactoryName(
const std::string& factoryName)
{
std::vector<GstElement*> outElements = getElements(
[&factoryName](GstElement* element) {
GStreamerPtr<gchar> name(
gst_object_get_name(GST_OBJECT(gst_element_get_factory(element))));
return name && (0 == strcmp(name, factoryName.c_str()));
});
return outElements;
}
GstElement* GStreamerPipelineFacade::getElementByName(const std::string& elementName)
{
std::vector<GstElement*> outElements = getElements(
[&elementName](GstElement* element) {
GStreamerPtr<gchar> name(gst_element_get_name(element));
return name && (0 == strcmp(name, elementName.c_str()));
});
if (outElements.empty())
{
return nullptr;
}
else
{
GAPI_Assert(1ul == outElements.size());
return outElements[0];
}
}
void GStreamerPipelineFacade::completePreroll() {
// FIXME: If there are multiple sources in pipeline and one of them is live, then pipeline
// will return GST_STATE_CHANGE_NO_PREROLL while pipeline pausing.
// But appsink may not be connected to this live source and only to anothers,
// not-live ones. So, it is not required to start the playback for appsink to complete
// the preroll.
// Starting of playback for the not-live sources before the first frame pull will lead
// to loosing of some amount of frames and pulling of the first frame can return frame
// which is far from the first.
//
// Need to handle this case or forbid to mix multiples sources of different
// categories(live, not-live) in the pipeline explicitly(assert).
if (!m_isPrerolled.load(std::memory_order_acquire))
{
std::lock_guard<std::mutex> lock(m_stateChangeMutex);
if(!m_isPrerolled.load(std::memory_order_relaxed))
{
PipelineState state = queryState();
// Only move forward in the pipeline's state machine
GAPI_Assert(state.current != GST_STATE_PLAYING);
GAPI_Assert(state.pending == GST_STATE_VOID_PENDING);
GstStateChangeReturn status = gst_element_set_state(m_pipeline, GST_STATE_PAUSED);
checkBusMessages();
if (status == GST_STATE_CHANGE_NO_PREROLL)
{
status = gst_element_set_state(m_pipeline, GST_STATE_PLAYING);
m_isPlaying.store(true);
}
verifyStateChange(status);
m_isPrerolled.store(true, std::memory_order_release);
}
}
}
void GStreamerPipelineFacade::play()
{
if (!m_isPlaying.load(std::memory_order_acquire))
{
std::lock_guard<std::mutex> lock(m_stateChangeMutex);
if (!m_isPlaying.load(std::memory_order_relaxed))
{
setState(GST_STATE_PLAYING);
m_isPlaying.store(true, std::memory_order_release);
m_isPrerolled.store(true);
}
}
}
bool GStreamerPipelineFacade::isPlaying() {
return m_isPlaying.load();
}
std::vector<GstElement*> GStreamerPipelineFacade::getElements(
std::function<bool(GstElement*)> comparator)
{
std::vector<GstElement*> outElements;
GStreamerPtr<GstIterator> it(gst_bin_iterate_elements(GST_BIN(m_pipeline.get())));
GValue value = G_VALUE_INIT;
GstIteratorResult status = gst_iterator_next(it, &value);
while (status != GST_ITERATOR_DONE && status != GST_ITERATOR_ERROR)
{
if (status == GST_ITERATOR_OK)
{
GstElement* element = GST_ELEMENT(g_value_get_object(&value));
if (comparator(element))
{
outElements.push_back(GST_ELEMENT(element));
}
g_value_unset(&value);
}
else if (status == GST_ITERATOR_RESYNC)
{
gst_iterator_resync(it);
}
status = gst_iterator_next(it, &value);
}
return outElements;
}
PipelineState GStreamerPipelineFacade::queryState()
{
GAPI_Assert(m_pipeline && GST_IS_ELEMENT(m_pipeline.get()) &&
"GStreamer pipeline has not been created!");
PipelineState state;
GstClockTime timeout = 5 * GST_SECOND;
gst_element_get_state(m_pipeline, &state.current, &state.pending, timeout);
return state;
}
void GStreamerPipelineFacade::setState(GstState newState)
{
PipelineState state = queryState();
GAPI_Assert(state.pending == GST_STATE_VOID_PENDING);
if (state.current != newState)
{
GstStateChangeReturn status = gst_element_set_state(m_pipeline, newState);
verifyStateChange(status);
}
}
void GStreamerPipelineFacade::verifyStateChange(GstStateChangeReturn status)
{
if (status == GST_STATE_CHANGE_ASYNC)
{
// Wait for status update.
status = gst_element_get_state(m_pipeline, NULL, NULL, GST_CLOCK_TIME_NONE);
}
if (status == GST_STATE_CHANGE_FAILURE)
{
checkBusMessages();
PipelineState state = queryState();
const gchar* currentState = gst_element_state_get_name(state.current);
const gchar* pendingState = gst_element_state_get_name(state.pending);
cv::util::throw_error(
std::runtime_error(std::string("Unable to change pipeline state from ") +
std::string(currentState) + std::string(" to ") +
std::string(pendingState)));
}
checkBusMessages();
}
// Handles GStreamer bus messages.
// For debugging purposes.
void GStreamerPipelineFacade::checkBusMessages() const
{
GStreamerPtr<GstBus> bus(gst_element_get_bus(m_pipeline));
while (gst_bus_have_pending(bus))
{
GStreamerPtr<GstMessage> msg(gst_bus_pop(bus));
if (!msg || !GST_IS_MESSAGE(msg.get()))
{
continue;
}
if (gst_is_missing_plugin_message(msg))
{
GStreamerPtr<gchar> descr(gst_missing_plugin_message_get_description(msg));
cv::util::throw_error(
std::runtime_error("Your GStreamer installation is missing a required plugin!"
"Details: " + std::string(descr)));
}
else
{
switch (GST_MESSAGE_TYPE(msg))
{
case GST_MESSAGE_STATE_CHANGED:
{
if (GST_MESSAGE_SRC(msg.get()) == GST_OBJECT(m_pipeline.get()))
{
GstState oldState = GST_STATE_NULL,
newState = GST_STATE_NULL;
gst_message_parse_state_changed(msg, &oldState, &newState, NULL);
const gchar* oldStateName = gst_element_state_get_name(oldState);
const gchar* newStateName = gst_element_state_get_name(newState);
GAPI_LOG_INFO(NULL, "Pipeline state changed from " << oldStateName << " to "
<< newStateName);
}
break;
}
case GST_MESSAGE_ERROR:
{
GError* error = NULL;
gchar* debug = NULL;
gst_message_parse_error(msg, &error, &debug); // transfer full for out args
GStreamerPtr<GError> err(error);
GStreamerPtr<gchar> deb(debug);
GStreamerPtr<gchar> name(gst_element_get_name(GST_MESSAGE_SRC(msg.get())));
GAPI_LOG_WARNING(NULL, "Embedded video playback halted; module " << name.get()
<< " reported: " << err->message);
GAPI_LOG_WARNING(NULL, "GStreamer debug: " << deb);
break;
}
default:
break;
}
}
}
}
} // namespace gst
} // namespace wip
} // namespace gapi
} // namespace cv
#endif // HAVE_GSTREAMER

@ -0,0 +1,89 @@
// This file is part of OpenCV project.
// It is subject to the license terms in the LICENSE file found in the top-level directory
// of this distribution and at http://opencv.org/license.html.
//
// Copyright (C) 2021 Intel Corporation
#ifndef OPENCV_GAPI_STREAMING_GSTREAMER_GSTREAMER_PIPELINE_FACADE_HPP
#define OPENCV_GAPI_STREAMING_GSTREAMER_GSTREAMER_PIPELINE_FACADE_HPP
#include "gstreamerptr.hpp"
#include <string>
#include <atomic>
#include <mutex>
#ifdef HAVE_GSTREAMER
#include <gst/gst.h>
namespace cv {
namespace gapi {
namespace wip {
namespace gst {
// GAPI_EXPORTS here is only for testing purposes.
struct GAPI_EXPORTS PipelineState
{
GstState current = GST_STATE_NULL;
GstState pending = GST_STATE_NULL;
};
// This class represents facade for pipeline GstElement and related functions.
// Class restricts pipeline to only move forward in its state machine:
// NULL -> READY -> PAUSED -> PLAYING.
// There is no possibility to pause and resume pipeline, it can be only once played.
// GAPI_EXPORTS here is only for testing purposes.
class GAPI_EXPORTS GStreamerPipelineFacade
{
public:
// Strong exception guarantee.
explicit GStreamerPipelineFacade(const std::string& pipeline);
// The destructors are noexcept by default. (since C++11)
~GStreamerPipelineFacade();
// Elements getters are not guarded with mutexes because elements order is not supposed
// to change in the pipeline.
std::vector<GstElement*> getElementsByFactoryName(const std::string& factoryName);
GstElement* getElementByName(const std::string& elementName);
// Pipeline state modifiers: can be called only once, MT-safe, mutually exclusive.
void completePreroll();
void play();
// Pipeline state checker: MT-safe.
bool isPlaying();
private:
std::string m_pipelineDesc;
GStreamerPtr<GstElement> m_pipeline;
std::atomic<bool> m_isPrerolled;
std::atomic<bool> m_isPlaying;
// Mutex to guard state(paused, playing) from changes from multiple threads
std::mutex m_stateChangeMutex;
private:
// This constructor is needed only to make public constructor as delegating constructor
// and allow it to throw exceptions.
GStreamerPipelineFacade();
// Elements getter is not guarded with mutex because elements order is not supposed
// to change in the pipeline.
std::vector<GstElement*> getElements(std::function<bool(GstElement*)> comparator);
// Getters, modifiers, verifiers are not MT-safe, because they are called from
// MT-safe mutually exclusive public functions.
PipelineState queryState();
void setState(GstState state);
void verifyStateChange(GstStateChangeReturn status);
void checkBusMessages() const;
};
} // namespace gst
} // namespace wip
} // namespace gapi
} // namespace cv
#endif // HAVE_GSTREAMER
#endif // OPENCV_GAPI_STREAMING_GSTREAMER_GSTREAMER_PIPELINE_FACADE_HPP

@ -0,0 +1,90 @@
// This file is part of OpenCV project.
// It is subject to the license terms in the LICENSE file found in the top-level directory
// of this distribution and at http://opencv.org/license.html.
//
// Copyright (C) 2021 Intel Corporation
#include "gstreamerenv.hpp"
#include "gstreamerptr.hpp"
#ifdef HAVE_GSTREAMER
#include <gst/gst.h>
#endif // HAVE_GSTREAMER
namespace cv {
namespace gapi {
namespace wip {
namespace gst {
#ifdef HAVE_GSTREAMER
const GStreamerEnv& GStreamerEnv::init()
{
static GStreamerEnv gInit;
return gInit;
}
GStreamerEnv::GStreamerEnv()
{
if (!gst_is_initialized())
{
GError* error = NULL;
gst_init_check(NULL, NULL, &error);
GStreamerPtr<GError> err(error);
if (err)
{
cv::util::throw_error(
std::runtime_error(std::string("GStreamer initializaton error! Details: ") +
err->message));
}
}
// FIXME: GStreamer libs which have same MAJOR and MINOR versions are API and ABI compatible.
// If GStreamer runtime MAJOR version differs from the version the application was
// compiled with, will it fail on the linkage stage? If so, the code below isn't needed.
guint major, minor, micro, nano;
gst_version(&major, &minor, &micro, &nano);
if (GST_VERSION_MAJOR != major)
{
cv::util::throw_error(
std::runtime_error(std::string("Incompatible GStreamer version: compiled with ") +
std::to_string(GST_VERSION_MAJOR) + '.' +
std::to_string(GST_VERSION_MINOR) + '.' +
std::to_string(GST_VERSION_MICRO) + '.' +
std::to_string(GST_VERSION_NANO) +
", but runtime has " +
std::to_string(major) + '.' + std::to_string(minor) + '.' +
std::to_string(micro) + '.' + std::to_string(nano) + '.'));
}
}
GStreamerEnv::~GStreamerEnv()
{
gst_deinit();
}
#else // HAVE_GSTREAMER
const GStreamerEnv& GStreamerEnv::init()
{
GAPI_Assert(false && "Built without GStreamer support!");
}
GStreamerEnv::GStreamerEnv()
{
GAPI_Assert(false && "Built without GStreamer support!");
}
GStreamerEnv::~GStreamerEnv()
{
// No need an assert here. The assert raise C4722 warning. Constructor have already got assert.
}
#endif // HAVE_GSTREAMER
} // namespace gst
} // namespace wip
} // namespace gapi
} // namespace cv

@ -0,0 +1,37 @@
// This file is part of OpenCV project.
// It is subject to the license terms in the LICENSE file found in the top-level directory
// of this distribution and at http://opencv.org/license.html.
//
// Copyright (C) 2021 Intel Corporation
#ifndef OPENCV_GAPI_STREAMING_GSTREAMER_GSTREAMERENV_HPP
#define OPENCV_GAPI_STREAMING_GSTREAMER_GSTREAMERENV_HPP
namespace cv {
namespace gapi {
namespace wip {
namespace gst {
/*!
* \brief The GStreamerEnv class
* Initializes gstreamer once in the whole process
*
*
* @note You need to build OpenCV with GStreamer support to use this class.
*/
class GStreamerEnv
{
public:
static const GStreamerEnv& init();
private:
GStreamerEnv();
~GStreamerEnv();
};
} // namespace gst
} // namespace wip
} // namespace gapi
} // namespace cv
#endif // OPENCV_GAPI_STREAMING_GSTREAMER_GSTREAMERENV_HPP

@ -0,0 +1,112 @@
// This file is part of OpenCV project.
// It is subject to the license terms in the LICENSE file found in the top-level directory
// of this distribution and at http://opencv.org/license.html.
//
// Copyright (C) 2021 Intel Corporation
#include "gstreamer_pipeline_facade.hpp"
#include "gstreamerpipeline_priv.hpp"
#include <opencv2/gapi/streaming/gstreamer/gstreamerpipeline.hpp>
#ifdef HAVE_GSTREAMER
#include <gst/app/gstappsink.h>
#endif // HAVE_GSTREAMER
namespace cv {
namespace gapi {
namespace wip {
namespace gst {
#ifdef HAVE_GSTREAMER
GStreamerPipeline::Priv::Priv(const std::string& pipelineDesc):
m_pipeline(std::make_shared<GStreamerPipelineFacade>(pipelineDesc))
{
std::vector<GstElement*> appsinks =
m_pipeline->getElementsByFactoryName("appsink");
for (std::size_t i = 0ul; i < appsinks.size(); ++i)
{
auto* appsink = appsinks[i];
GAPI_Assert(appsink != nullptr);
GStreamerPtr<gchar> name(gst_element_get_name(appsink));
auto result = m_appsinkNamesToUse.insert({ name.get(), true /* free */ });
GAPI_Assert(std::get<1>(result) && "Each appsink name must be unique!");
}
}
IStreamSource::Ptr GStreamerPipeline::Priv::getStreamingSource(
const std::string& appsinkName, const GStreamerSource::OutputType outputType)
{
auto appsinkNameIt = m_appsinkNamesToUse.find(appsinkName);
if (appsinkNameIt == m_appsinkNamesToUse.end())
{
cv::util::throw_error(std::logic_error(std::string("There is no appsink element in the "
"pipeline with the name '") + appsinkName + "'."));
}
if (!appsinkNameIt->second)
{
cv::util::throw_error(std::logic_error(std::string("appsink element with the name '") +
appsinkName + "' has been already used to create a GStreamerSource!"));
}
m_appsinkNamesToUse[appsinkName] = false /* not free */;
IStreamSource::Ptr src;
try {
src = cv::gapi::wip::make_src<cv::gapi::wip::GStreamerSource>(m_pipeline, appsinkName,
outputType);
}
catch(...) {
m_appsinkNamesToUse[appsinkName] = true; /* free */
cv::util::throw_error(std::runtime_error(std::string("Error during creation of ") +
"GStreamerSource on top of '" + appsinkName + "' appsink element!"));
}
return src;
}
GStreamerPipeline::Priv::~Priv() { }
#else // HAVE_GSTREAMER
GStreamerPipeline::Priv::Priv(const std::string&)
{
GAPI_Assert(false && "Built without GStreamer support!");
}
IStreamSource::Ptr GStreamerPipeline::Priv::getStreamingSource(const std::string&,
const GStreamerSource::OutputType)
{
// No need an assert here. The assert raise C4702 warning. Constructor have already got assert.
return nullptr;
}
GStreamerPipeline::Priv::~Priv()
{
// No need an assert here. The assert raise C4722 warning. Constructor have already got assert.
}
#endif // HAVE_GSTREAMER
GStreamerPipeline::GStreamerPipeline(const std::string& pipelineDesc):
m_priv(new Priv(pipelineDesc)) { }
IStreamSource::Ptr GStreamerPipeline::getStreamingSource(
const std::string& appsinkName, const GStreamerSource::OutputType outputType)
{
return m_priv->getStreamingSource(appsinkName, outputType);
}
GStreamerPipeline::~GStreamerPipeline()
{ }
GStreamerPipeline::GStreamerPipeline(std::unique_ptr<Priv> priv):
m_priv(std::move(priv))
{ }
} // namespace gst
} // namespace wip
} // namespace gapi
} // namespace cv

@ -0,0 +1,58 @@
// This file is part of OpenCV project.
// It is subject to the license terms in the LICENSE file found in the top-level directory
// of this distribution and at http://opencv.org/license.html.
//
// Copyright (C) 2021 Intel Corporation
#ifndef OPENCV_GAPI_STREAMING_GSTREAMER_GSTREAMERPIPELINE_PRIV_HPP
#define OPENCV_GAPI_STREAMING_GSTREAMER_GSTREAMERPIPELINE_PRIV_HPP
#include <opencv2/gapi/streaming/gstreamer/gstreamerpipeline.hpp>
#include <string>
#include <unordered_map>
namespace cv {
namespace gapi {
namespace wip {
namespace gst {
#ifdef HAVE_GSTREAMER
class GStreamerPipeline::Priv
{
public:
explicit Priv(const std::string& pipeline);
IStreamSource::Ptr getStreamingSource(const std::string& appsinkName,
const GStreamerSource::OutputType outputType);
virtual ~Priv();
protected:
std::shared_ptr<GStreamerPipelineFacade> m_pipeline;
std::unordered_map<std::string, bool> m_appsinkNamesToUse;
};
#else // HAVE_GSTREAMER
class GStreamerPipeline::Priv
{
public:
explicit Priv(const std::string& pipeline);
IStreamSource::Ptr getStreamingSource(const std::string& appsinkName,
const GStreamerSource::OutputType outputType);
virtual ~Priv();
};
#endif // HAVE_GSTREAMER
} // namespace gst
} // namespace wip
} // namespace gapi
} // namespace cv
#endif // OPENCV_GAPI_STREAMING_GSTREAMER_GSTREAMERPIPELINE_PRIV_HPP

@ -0,0 +1,177 @@
// This file is part of OpenCV project.
// It is subject to the license terms in the LICENSE file found in the top-level directory
// of this distribution and at http://opencv.org/license.html.
//
// Copyright (C) 2021 Intel Corporation
#ifndef OPENCV_GAPI_STREAMING_GSTREAMER_GSTREAMERPTR_HPP
#define OPENCV_GAPI_STREAMING_GSTREAMER_GSTREAMERPTR_HPP
#include <opencv2/gapi.hpp>
#include <utility>
#ifdef HAVE_GSTREAMER
#include <gst/gst.h>
#include <gst/video/video-frame.h>
namespace cv {
namespace gapi {
namespace wip {
namespace gst {
template<typename T> static inline void GStreamerPtrUnrefObject(T* ptr)
{
if (ptr)
{
gst_object_unref(G_OBJECT(ptr));
}
}
template<typename T> static inline void GStreamerPtrRelease(T* ptr);
template<> inline void GStreamerPtrRelease<GError>(GError* ptr)
{
if (ptr)
{
g_error_free(ptr);
}
}
template<> inline void GStreamerPtrRelease<GstElement>(GstElement* ptr)
{
GStreamerPtrUnrefObject<GstElement>(ptr);
}
template<> inline void GStreamerPtrRelease<GstElementFactory>(GstElementFactory* ptr)
{
GStreamerPtrUnrefObject<GstElementFactory>(ptr);
}
template<> inline void GStreamerPtrRelease<GstPad>(GstPad* ptr)
{
GStreamerPtrUnrefObject<GstPad>(ptr);
}
template<> inline void GStreamerPtrRelease<GstBus>(GstBus* ptr)
{
GStreamerPtrUnrefObject<GstBus>(ptr);
}
template<> inline void GStreamerPtrRelease<GstAllocator>(GstAllocator* ptr)
{
GStreamerPtrUnrefObject<GstAllocator>(ptr);
}
template<> inline void GStreamerPtrRelease<GstVideoInfo>(GstVideoInfo* ptr)
{
if (ptr)
{
gst_video_info_free(ptr);
}
}
template<> inline void GStreamerPtrRelease<GstCaps>(GstCaps* ptr)
{
if (ptr)
{
gst_caps_unref(ptr);
}
}
template<> inline void GStreamerPtrRelease<GstMemory>(GstMemory* ptr)
{
if (ptr)
{
gst_memory_unref(ptr);
}
}
template<> inline void GStreamerPtrRelease<GstBuffer>(GstBuffer* ptr)
{
if (ptr)
{
gst_buffer_unref(ptr);
}
}
template<> inline void GStreamerPtrRelease<GstSample>(GstSample* ptr)
{
if (ptr)
{
gst_sample_unref(ptr);
}
}
template<> inline void GStreamerPtrRelease<GstMessage>(GstMessage* ptr)
{
if (ptr)
{
gst_message_unref(ptr);
}
}
template<> inline void GStreamerPtrRelease<GstIterator>(GstIterator* ptr)
{
if (ptr)
{
gst_iterator_free(ptr);
}
}
template<> inline void GStreamerPtrRelease<GstQuery>(GstQuery* ptr)
{
if (ptr)
{
gst_query_unref(ptr);
}
}
template<> inline void GStreamerPtrRelease<char>(char* ptr)
{
if (ptr)
{
g_free(ptr);
}
}
// NOTE: The main concept of this class is to be owner of some passed to it piece of memory.
// (be owner = free this memory or reduce reference count to it after use).
// More specifically, GStreamerPtr is designed to own memory returned from GStreamer/GLib
// functions, which are marked as [transfer full] in documentation.
// [transfer full] means that function fully transfers ownership of returned memory to the
// receiving piece of code.
//
// Memory ownership and ownership transfer concept:
// https://developer.gnome.org/programming-guidelines/stable/memory-management.html.en#g-clear-object
// NOTE: GStreamerPtr can only own strong references, not floating ones.
// For floating references please call g_object_ref_sink(reference) before wrapping
// it with GStreamerPtr.
// See https://developer.gnome.org/gobject/stable/gobject-The-Base-Object-Type.html#floating-ref
// for floating references.
// NOTE: GStreamerPtr doesn't support pointers to arrays, only pointers to single objects.
template<typename T> class GStreamerPtr :
public std::unique_ptr<T, decltype(&GStreamerPtrRelease<T>)>
{
using BaseClass = std::unique_ptr<T, decltype(&GStreamerPtrRelease<T>)>;
public:
constexpr GStreamerPtr() noexcept : BaseClass(nullptr, GStreamerPtrRelease<T>) { }
constexpr GStreamerPtr(std::nullptr_t) noexcept : BaseClass(nullptr, GStreamerPtrRelease<T>) { }
explicit GStreamerPtr(typename BaseClass::pointer p) noexcept :
BaseClass(p, GStreamerPtrRelease<T>) { }
GStreamerPtr& operator=(T* p) noexcept { *this = std::move(GStreamerPtr<T>(p)); return *this; }
inline operator T*() noexcept { return this->get(); }
// There is no const correctness in GStreamer C API
inline operator /*const*/ T*() const noexcept { return (T*)this->get(); }
};
} // namespace gst
} // namespace wip
} // namespace gapi
} // namespace cv
#endif // HAVE_GSTREAMER
#endif // OPENCV_GAPI_STREAMING_GSTREAMER_GSTREAMERPTR_HPP

@ -0,0 +1,383 @@
// This file is part of OpenCV project.
// It is subject to the license terms in the LICENSE file found in the top-level directory
// of this distribution and at http://opencv.org/license.html.
//
// Copyright (C) 2021 Intel Corporation
#include "gstreamer_buffer_utils.hpp"
#include "gstreamer_media_adapter.hpp"
#include "gstreamersource_priv.hpp"
#include <opencv2/gapi/streaming/gstreamer/gstreamersource.hpp>
#include <opencv2/gapi/streaming/meta.hpp>
#include <logger.hpp>
#include <opencv2/imgproc.hpp>
#ifdef HAVE_GSTREAMER
#include <gst/app/gstappsink.h>
#include <gst/gstbuffer.h>
#include <gst/video/video-frame.h>
#endif // HAVE_GSTREAMER
namespace cv {
namespace gapi {
namespace wip {
namespace gst {
#ifdef HAVE_GSTREAMER
constexpr char NV12_CAPS_STRING[] =
"video/x-raw,format=NV12;video/x-raw(memory:DMABuf),format=NV12";
namespace {
GstPadProbeReturn appsinkQueryCallback(GstPad*, GstPadProbeInfo* info, gpointer)
{
GstQuery *query = GST_PAD_PROBE_INFO_QUERY(info);
if (GST_QUERY_TYPE(query) != GST_QUERY_ALLOCATION)
return GST_PAD_PROBE_OK;
gst_query_add_allocation_meta(query, GST_VIDEO_META_API_TYPE, NULL);
return GST_PAD_PROBE_HANDLED;
}
} // anonymous namespace
GStreamerSource::Priv::Priv(const std::string& pipelineDesc,
const GStreamerSource::OutputType outputType) :
m_pipeline(std::make_shared<GStreamerPipelineFacade>(pipelineDesc)),
m_outputType(outputType)
{
GAPI_Assert((m_outputType == GStreamerSource::OutputType::FRAME ||
m_outputType == GStreamerSource::OutputType::MAT)
&& "Unsupported output type for GStreamerSource!");
auto appsinks = m_pipeline->getElementsByFactoryName("appsink");
GAPI_Assert(1ul == appsinks.size() &&
"GStreamerSource can accept pipeline with only 1 appsink element inside!\n"
"Please, note, that amount of sink elements of other than appsink type is not limited.\n");
m_appsink = GST_ELEMENT(gst_object_ref(appsinks[0]));
configureAppsink();
}
GStreamerSource::Priv::Priv(std::shared_ptr<GStreamerPipelineFacade> pipeline,
const std::string& appsinkName,
const GStreamerSource::OutputType outputType) :
m_pipeline(pipeline),
m_outputType(outputType)
{
GAPI_Assert((m_outputType == GStreamerSource::OutputType::FRAME ||
m_outputType == GStreamerSource::OutputType::MAT)
&& "Unsupported output type for GStreamerSource!");
m_appsink = (GST_ELEMENT(gst_object_ref(m_pipeline->getElementByName(appsinkName))));
configureAppsink();
}
bool GStreamerSource::Priv::pull(cv::gapi::wip::Data& data)
{
bool result = false;
switch(m_outputType) {
case GStreamerSource::OutputType::FRAME: {
cv::MediaFrame frame;
result = retrieveFrame(frame);
if (result) {
data = frame;
}
break;
}
case GStreamerSource::OutputType::MAT: {
cv::Mat mat;
result = retrieveFrame(mat);
if (result) {
data = mat;
}
break;
}
}
if (result) {
data.meta[cv::gapi::streaming::meta_tag::timestamp] = computeTimestamp();
data.meta[cv::gapi::streaming::meta_tag::seq_id] = m_frameId++;
}
return result;
}
GMetaArg GStreamerSource::Priv::descr_of() noexcept
{
// Prepare frame metadata if it wasn't prepared yet.
prepareVideoMeta();
switch(m_outputType) {
case GStreamerSource::OutputType::FRAME: {
return GMetaArg { m_mediaFrameMeta };
}
case GStreamerSource::OutputType::MAT: {
return GMetaArg { m_matMeta };
}
}
return GMetaArg { };
}
void GStreamerSource::Priv::configureAppsink() {
// NOTE: appsink element should be configured before pipeline launch.
GAPI_Assert(!m_pipeline->isPlaying());
// TODO: is 1 single buffer really high enough?
gst_app_sink_set_max_buffers(GST_APP_SINK(m_appsink.get()), 1);
// Do not emit signals: all calls will be synchronous and blocking.
gst_app_sink_set_emit_signals(GST_APP_SINK(m_appsink.get()), FALSE);
GStreamerPtr<GstCaps> nv12Caps(gst_caps_from_string(NV12_CAPS_STRING));
GStreamerPtr<GstPad> appsinkPad(gst_element_get_static_pad(m_appsink, "sink"));
GStreamerPtr<GstCaps> peerCaps(gst_pad_peer_query_caps(appsinkPad, NULL));
if (!gst_caps_can_intersect(peerCaps, nv12Caps)) {
cv::util::throw_error(
std::logic_error("appsink element can only consume video-frame in NV12 format in "
"GStreamerSource"));
}
gst_app_sink_set_caps(GST_APP_SINK(m_appsink.get()), nv12Caps);
gst_pad_add_probe(appsinkPad, GST_PAD_PROBE_TYPE_QUERY_DOWNSTREAM, appsinkQueryCallback,
NULL, NULL);
}
void GStreamerSource::Priv::prepareVideoMeta()
{
if (!m_isMetaPrepared) {
m_pipeline->completePreroll();
GStreamerPtr<GstSample> prerollSample(
#if GST_VERSION_MINOR >= 10
gst_app_sink_try_pull_preroll(GST_APP_SINK(m_appsink.get()), 5 * GST_SECOND));
#else // GST_VERSION_MINOR < 10
// TODO: This function may cause hang with some pipelines, need to check whether these
// pipelines are really wrong or not?
gst_app_sink_pull_preroll(GST_APP_SINK(m_appsink.get())));
#endif // GST_VERSION_MINOR >= 10
GAPI_Assert(prerollSample != nullptr);
GstCaps* prerollCaps(gst_sample_get_caps(prerollSample));
GAPI_Assert(prerollCaps != nullptr);
const GstStructure* structure = gst_caps_get_structure(prerollCaps, 0);
// Width and height held in GstCaps structure are format-independent width and height
// of the frame. So the actual height of the output buffer in NV12 format will be
// height * 3/2.
int width = 0;
int height = 0;
if (!gst_structure_get_int(structure, "width", &width) ||
!gst_structure_get_int(structure, "height", &height))
{
cv::util::throw_error(std::logic_error("Cannot query video width/height."));
}
switch(m_outputType) {
case GStreamerSource::OutputType::FRAME: {
// Construct metadata for media frame.
m_mediaFrameMeta = GFrameDesc { cv::MediaFormat::NV12, cv::Size(width, height) };
break;
}
case GStreamerSource::OutputType::MAT: {
// Construct metadata for BGR mat.
m_matMeta = GMatDesc { CV_8U, 3, cv::Size(width, height), false };
break;
}
}
// Fill GstVideoInfo structure to work further with GstVideoFrame class.
if (!gst_video_info_from_caps(&m_videoInfo, prerollCaps)) {
cv::util::throw_error(std::logic_error("preroll sample has invalid caps."));
}
GAPI_Assert(GST_VIDEO_INFO_N_PLANES(&m_videoInfo) == 2);
GAPI_Assert(GST_VIDEO_INFO_FORMAT(&m_videoInfo) == GST_VIDEO_FORMAT_NV12);
m_isMetaPrepared = true;
}
}
int64_t GStreamerSource::Priv::computeTimestamp()
{
GAPI_Assert(m_buffer && "Pulled buffer is nullptr!");
int64_t timestamp { };
GstClockTime pts = GST_BUFFER_PTS(m_buffer);
if (GST_CLOCK_TIME_IS_VALID(pts)) {
timestamp = GST_TIME_AS_USECONDS(pts);
} else {
const auto now = std::chrono::system_clock::now();
const auto dur = std::chrono::duration_cast<std::chrono::microseconds>
(now.time_since_epoch());
timestamp = int64_t{dur.count()};
}
return timestamp;
}
bool GStreamerSource::Priv::pullBuffer()
{
// Start the pipeline if it is not in the playing state yet
if (!m_isPipelinePlaying) {
m_pipeline->play();
m_isPipelinePlaying = true;
}
// Bail out if EOS
if (gst_app_sink_is_eos(GST_APP_SINK(m_appsink.get())))
{
return false;
}
m_sample = gst_app_sink_pull_sample(GST_APP_SINK(m_appsink.get()));
// TODO: GAPI_Assert because of already existed check for EOS?
if (m_sample == nullptr)
{
return false;
}
m_buffer = gst_sample_get_buffer(m_sample);
GAPI_Assert(m_buffer && "Grabbed sample has no buffer!");
return true;
}
bool GStreamerSource::Priv::retrieveFrame(cv::Mat& data)
{
// Prepare metadata if it isn't prepared yet.
prepareVideoMeta();
bool result = pullBuffer();
if (!result)
{
return false;
}
// TODO: Use RAII for map/unmap
GstVideoFrame videoFrame;
gstreamer_utils::mapBufferToFrame(*m_buffer, m_videoInfo, videoFrame, GST_MAP_READ);
try
{
// m_matMeta holds width and height for 8U BGR frame, but actual
// frame m_buffer we request from GStreamer pipeline has 8U NV12 format.
// Constructing y and uv cv::Mat-s from such a m_buffer:
GAPI_Assert((uint8_t*)GST_VIDEO_FRAME_PLANE_DATA(&videoFrame, 1) ==
(uint8_t*)GST_VIDEO_FRAME_PLANE_DATA(&videoFrame, 0) +
GST_VIDEO_FRAME_PLANE_OFFSET(&videoFrame, 1));
cv::Mat y(m_matMeta.size, CV_8UC1,
(uint8_t*)GST_VIDEO_FRAME_PLANE_DATA(&videoFrame, 0) +
GST_VIDEO_FRAME_PLANE_OFFSET(&videoFrame, 0),
GST_VIDEO_FRAME_PLANE_STRIDE(&videoFrame, 0));
cv::Mat uv(m_matMeta.size / 2, CV_8UC2,
(uint8_t*)GST_VIDEO_FRAME_PLANE_DATA(&videoFrame, 0) +
GST_VIDEO_FRAME_PLANE_OFFSET(&videoFrame, 1),
GST_VIDEO_FRAME_PLANE_STRIDE(&videoFrame, 1));
cv::cvtColorTwoPlane(y, uv, data, cv::COLOR_YUV2BGR_NV12);
}
catch (...)
{
gst_video_frame_unmap(&videoFrame);
cv::util::throw_error(std::runtime_error("NV12 buffer conversion to BGR is failed!"));
}
gst_video_frame_unmap(&videoFrame);
return true;
}
bool GStreamerSource::Priv::retrieveFrame(cv::MediaFrame& data)
{
// Prepare metadata if it isn't prepared yet.
prepareVideoMeta();
bool result = pullBuffer();
if (!result)
{
return false;
}
data = cv::MediaFrame::Create<GStreamerMediaAdapter>(m_mediaFrameMeta, &m_videoInfo,
m_buffer);
return true;
}
GStreamerSource::Priv::~Priv() { }
#else // HAVE_GSTREAMER
GStreamerSource::Priv::Priv(const std::string&, const GStreamerSource::OutputType)
{
GAPI_Assert(false && "Built without GStreamer support!");
}
GStreamerSource::Priv::Priv(std::shared_ptr<GStreamerPipelineFacade>, const std::string&,
const GStreamerSource::OutputType)
{
GAPI_Assert(false && "Built without GStreamer support!");
}
bool GStreamerSource::Priv::pull(cv::gapi::wip::Data&)
{
// No need an assert here. Constructor have already got assert.
return false;
}
GMetaArg GStreamerSource::Priv::descr_of() const noexcept
{
// No need an assert here. The assert raise C4702 warning. Constructor have already got assert.
return GMetaArg{};
}
GStreamerSource::Priv::~Priv()
{
// No need an assert here. The assert raise C4722 warning. Constructor have already got assert.
}
#endif // HAVE_GSTREAMER
GStreamerSource::GStreamerSource(const std::string& pipeline,
const GStreamerSource::OutputType outputType):
m_priv(new Priv(pipeline, outputType)) { }
GStreamerSource::GStreamerSource(std::shared_ptr<GStreamerPipelineFacade> pipeline,
const std::string& appsinkName,
const GStreamerSource::OutputType outputType):
m_priv(new Priv(pipeline, appsinkName, outputType)) { }
bool GStreamerSource::pull(cv::gapi::wip::Data& data)
{
return m_priv->pull(data);
}
GMetaArg GStreamerSource::descr_of() const
{
return m_priv->descr_of();
}
GStreamerSource::~GStreamerSource()
{ }
GStreamerSource::GStreamerSource(std::unique_ptr<Priv> priv):
m_priv(std::move(priv))
{ }
} // namespace gst
} // namespace wip
} // namespace gapi
} // namespace cv

@ -0,0 +1,94 @@
// This file is part of OpenCV project.
// It is subject to the license terms in the LICENSE file found in the top-level directory
// of this distribution and at http://opencv.org/license.html.
//
// Copyright (C) 2021 Intel Corporation
#ifndef OPENCV_GAPI_STREAMING_GSTREAMER_GSTREAMERSOURCE_PRIV_HPP
#define OPENCV_GAPI_STREAMING_GSTREAMER_GSTREAMERSOURCE_PRIV_HPP
#include "gstreamerptr.hpp"
#include "gstreamer_pipeline_facade.hpp"
#include <opencv2/gapi/streaming/gstreamer/gstreamersource.hpp>
#include <string>
#ifdef HAVE_GSTREAMER
#include <gst/gst.h>
#include <gst/video/video-frame.h>
#endif // HAVE_GSTREAMER
namespace cv {
namespace gapi {
namespace wip {
namespace gst {
#ifdef HAVE_GSTREAMER
class GStreamerSource::Priv
{
public:
Priv(const std::string& pipeline, const GStreamerSource::OutputType outputType);
Priv(std::shared_ptr<GStreamerPipelineFacade> pipeline, const std::string& appsinkName,
const GStreamerSource::OutputType outputType);
bool pull(cv::gapi::wip::Data& data);
// non-const in difference with GStreamerSource, because contains delayed meta initialization
GMetaArg descr_of() noexcept;
virtual ~Priv();
protected:
// Shares:
std::shared_ptr<GStreamerPipelineFacade> m_pipeline;
// Owns:
GStreamerPtr<GstElement> m_appsink;
GStreamerPtr<GstSample> m_sample;
GstBuffer* m_buffer = nullptr; // Actual frame memory holder
GstVideoInfo m_videoInfo; // Information about Video frame
GStreamerSource::OutputType m_outputType = GStreamerSource::OutputType::MAT;
GMatDesc m_matMeta;
GFrameDesc m_mediaFrameMeta;
bool m_isMetaPrepared = false;
bool m_isPipelinePlaying = false;
int64_t m_frameId = 0L;
protected:
void configureAppsink();
void prepareVideoMeta();
int64_t computeTimestamp();
bool pullBuffer();
bool retrieveFrame(cv::Mat& data);
bool retrieveFrame(cv::MediaFrame& data);
};
#else // HAVE_GSTREAMER
class GStreamerSource::Priv
{
public:
Priv(const std::string& pipeline, const GStreamerSource::OutputType outputType);
Priv(std::shared_ptr<GStreamerPipelineFacade> pipeline, const std::string& appsinkName,
const GStreamerSource::OutputType outputType);
bool pull(cv::gapi::wip::Data& data);
GMetaArg descr_of() const noexcept;
virtual ~Priv();
};
#endif // HAVE_GSTREAMER
} // namespace gst
} // namespace wip
} // namespace gapi
} // namespace cv
#endif // OPENCV_GAPI_STREAMING_GSTREAMER_GSTREAMERSOURCE_PRIV_HPP

@ -0,0 +1,188 @@
// This file is part of OpenCV project.
// It is subject to the license terms in the LICENSE file found in the top-level directory
// of this distribution and at http://opencv.org/license.html.
//
// Copyright (C) 2021 Intel Corporation
#include "../test/common/gapi_tests_common.hpp"
#include "../src/streaming/gstreamer/gstreamer_pipeline_facade.hpp"
#include "../src/streaming/gstreamer/gstreamerptr.hpp"
#include <opencv2/ts.hpp>
#include <thread>
#ifdef HAVE_GSTREAMER
#include <gst/app/gstappsink.h>
namespace opencv_test
{
TEST(GStreamerPipelineFacadeTest, GetElsByFactoryNameUnitTest)
{
auto comparator = [](GstElement* element, const std::string& factoryName) {
cv::gapi::wip::gst::GStreamerPtr<gchar> name(
gst_object_get_name(GST_OBJECT(gst_element_get_factory(element))));
return name && (0 == strcmp(name, factoryName.c_str()));
};
cv::gapi::wip::gst::GStreamerPipelineFacade
pipelineFacade("videotestsrc is-live=true pattern=colors num-buffers=10 ! "
"videorate ! videoscale ! "
"video/x-raw,width=1920,height=1080,framerate=3/1 ! "
"appsink name=sink1 "
"videotestsrc is-live=true pattern=colors num-buffers=10 ! "
"videorate ! videoscale ! "
"video/x-raw,width=1920,height=1080,framerate=3/1 ! "
"appsink name=sink2");
auto videotestsrcs = pipelineFacade.getElementsByFactoryName("videotestsrc");
EXPECT_EQ(2u, videotestsrcs.size());
for (auto&& src : videotestsrcs) {
EXPECT_TRUE(comparator(src, "videotestsrc"));
}
auto appsinks = pipelineFacade.getElementsByFactoryName("appsink");
EXPECT_EQ(2u, appsinks.size());
for (auto&& sink : appsinks) {
EXPECT_TRUE(comparator(sink, "appsink"));
}
}
TEST(GStreamerPipelineFacadeTest, GetElByNameUnitTest)
{
auto comparator = [](GstElement* element, const std::string& elementName) {
cv::gapi::wip::gst::GStreamerPtr<gchar> name(gst_element_get_name(element));
return name && (0 == strcmp(name, elementName.c_str()));
};
cv::gapi::wip::gst::GStreamerPipelineFacade
pipelineFacade("videotestsrc is-live=true pattern=colors num-buffers=10 ! "
"videorate ! videoscale ! "
"video/x-raw,width=1920,height=1080,framerate=3/1 ! "
"appsink name=sink1 "
"videotestsrc is-live=true pattern=colors num-buffers=10 ! "
"videorate ! videoscale ! "
"video/x-raw,width=1920,height=1080,framerate=3/1 ! "
"appsink name=sink2");
auto appsink1 = pipelineFacade.getElementByName("sink1");
GAPI_Assert(appsink1 != nullptr);
EXPECT_TRUE(comparator(appsink1, "sink1"));
auto appsink2 = pipelineFacade.getElementByName("sink2");
GAPI_Assert(appsink2 != nullptr);
EXPECT_TRUE(comparator(appsink2, "sink2"));
}
TEST(GStreamerPipelineFacadeTest, CompletePrerollUnitTest)
{
cv::gapi::wip::gst::GStreamerPipelineFacade
pipelineFacade("videotestsrc is-live=true pattern=colors num-buffers=10 ! "
"videorate ! videoscale ! "
"video/x-raw,width=1920,height=1080,framerate=3/1 ! "
"appsink name=sink1 "
"videotestsrc is-live=true pattern=colors num-buffers=10 ! "
"videorate ! videoscale ! "
"video/x-raw,width=1920,height=1080,framerate=3/1 ! "
"appsink name=sink2");
auto appsink = pipelineFacade.getElementByName("sink1");
pipelineFacade.completePreroll();
cv::gapi::wip::gst::GStreamerPtr<GstSample> prerollSample(
#if GST_VERSION_MINOR >= 10
gst_app_sink_try_pull_preroll(GST_APP_SINK(appsink), 5 * GST_SECOND)
#else // GST_VERSION_MINOR < 10
// TODO: This function may cause hang with some pipelines, need to check whether these
// pipelines are really wrong or not?
gst_app_sink_pull_preroll(GST_APP_SINK(appsink))
#endif // GST_VERSION_MINOR >= 10
);
GAPI_Assert(prerollSample != nullptr);
}
TEST(GStreamerPipelineFacadeTest, PlayUnitTest)
{
cv::gapi::wip::gst::GStreamerPipelineFacade
pipelineFacade("videotestsrc is-live=true pattern=colors num-buffers=10 ! "
"videorate ! videoscale ! "
"video/x-raw,width=1920,height=1080,framerate=3/1 ! "
"appsink name=sink1 "
"videotestsrc is-live=true pattern=colors num-buffers=10 ! "
"videorate ! videoscale ! "
"video/x-raw,width=1920,height=1080,framerate=3/1 ! "
"appsink name=sink2");
auto appsink = pipelineFacade.getElementByName("sink2");
pipelineFacade.play();
cv::gapi::wip::gst::PipelineState state;
GstStateChangeReturn status =
gst_element_get_state(appsink, &state.current, &state.pending, 5 * GST_SECOND);
EXPECT_EQ(GST_STATE_CHANGE_SUCCESS, status);
EXPECT_EQ(GST_STATE_PLAYING, state.current);
EXPECT_EQ(GST_STATE_VOID_PENDING, state.pending);
}
TEST(GStreamerPipelineFacadeTest, IsPlayingUnitTest)
{
cv::gapi::wip::gst::GStreamerPipelineFacade
pipelineFacade("videotestsrc is-live=true pattern=colors num-buffers=10 ! "
"videorate ! videoscale ! "
"video/x-raw,width=1920,height=1080,framerate=3/1 ! "
"appsink name=sink1 "
"videotestsrc is-live=true pattern=colors num-buffers=10 ! "
"videorate ! videoscale ! "
"video/x-raw,width=1920,height=1080,framerate=3/1 ! "
"appsink name=sink2");
EXPECT_EQ(false, pipelineFacade.isPlaying());
pipelineFacade.play();
EXPECT_EQ(true, pipelineFacade.isPlaying());
}
TEST(GStreamerPipelineFacadeTest, MTSafetyUnitTest)
{
cv::gapi::wip::gst::GStreamerPipelineFacade
pipelineFacade("videotestsrc is-live=true pattern=colors num-buffers=10 ! "
"videorate ! videoscale ! "
"video/x-raw,width=1920,height=1080,framerate=3/1 ! "
"appsink name=sink1 "
"videotestsrc is-live=true pattern=colors num-buffers=10 ! "
"videorate ! videoscale ! "
"video/x-raw,width=1920,height=1080,framerate=3/1 ! "
"appsink name=sink2");
auto prerollRoutine = [&pipelineFacade](){ pipelineFacade.completePreroll(); };
auto playRoutine = [&pipelineFacade](){ pipelineFacade.play(); };
auto isPlayingRoutine = [&pipelineFacade](){ pipelineFacade.isPlaying(); };
using f = std::function<void()>;
auto routinesLauncher = [](const f& r1, const f& r2, const f& r3) {
std::vector<f> routines { r1, r2, r3 };
std::vector<std::thread> threads { };
for (auto&& r : routines) {
threads.emplace_back(r);
}
for (auto&& t : threads) {
t.join();
}
};
routinesLauncher(prerollRoutine, playRoutine, isPlayingRoutine);
routinesLauncher(prerollRoutine, isPlayingRoutine, playRoutine);
routinesLauncher(isPlayingRoutine, prerollRoutine, playRoutine);
routinesLauncher(playRoutine, prerollRoutine, isPlayingRoutine);
routinesLauncher(playRoutine, isPlayingRoutine, prerollRoutine);
routinesLauncher(isPlayingRoutine, playRoutine, prerollRoutine);
EXPECT_TRUE(true);
}
} // namespace opencv_test
#endif // HAVE_GSTREAMER

@ -0,0 +1,401 @@
// This file is part of OpenCV project.
// It is subject to the license terms in the LICENSE file found in the top-level directory
// of this distribution and at http://opencv.org/license.html.
//
// Copyright (C) 2021 Intel Corporation
#include "../test/common/gapi_tests_common.hpp"
#include <opencv2/gapi/streaming/gstreamer/gstreamerpipeline.hpp>
#include <opencv2/gapi/streaming/gstreamer/gstreamersource.hpp>
#include <opencv2/gapi/core.hpp>
#include <opencv2/gapi/cpu/core.hpp>
#include <opencv2/gapi/streaming/meta.hpp>
#include <opencv2/gapi/streaming/format.hpp>
#include <opencv2/gapi/gkernel.hpp>
#include <opencv2/gapi/cpu/gcpukernel.hpp>
#include <opencv2/gapi/gcomputation.hpp>
#include <opencv2/ts.hpp>
#include <regex>
#ifdef HAVE_GSTREAMER
namespace opencv_test
{
struct GStreamerSourceTest : public TestWithParam<std::tuple<std::string, cv::Size, std::size_t>>
{ };
TEST_P(GStreamerSourceTest, AccuracyTest)
{
std::string pipeline;
cv::Size expectedFrameSize;
std::size_t streamLength { };
std::tie(pipeline, expectedFrameSize, streamLength) = GetParam();
// Graph declaration:
cv::GMat in;
auto out = cv::gapi::copy(in);
cv::GComputation c(cv::GIn(in), cv::GOut(out));
// Graph compilation for streaming mode:
auto ccomp = c.compileStreaming();
EXPECT_TRUE(ccomp);
EXPECT_FALSE(ccomp.running());
// GStreamer streaming source configuration:
ccomp.setSource<cv::gapi::wip::GStreamerSource>(pipeline);
// Start of streaming:
ccomp.start();
EXPECT_TRUE(ccomp.running());
// Streaming - pulling of frames until the end:
cv::Mat in_mat_gapi;
EXPECT_TRUE(ccomp.pull(cv::gout(in_mat_gapi)));
EXPECT_TRUE(!in_mat_gapi.empty());
EXPECT_EQ(expectedFrameSize, in_mat_gapi.size());
EXPECT_EQ(CV_8UC3, in_mat_gapi.type());
std::size_t framesCount = 1UL;
while (ccomp.pull(cv::gout(in_mat_gapi))) {
EXPECT_TRUE(!in_mat_gapi.empty());
EXPECT_EQ(expectedFrameSize, in_mat_gapi.size());
EXPECT_EQ(CV_8UC3, in_mat_gapi.type());
framesCount++;
}
EXPECT_FALSE(ccomp.running());
ccomp.stop();
EXPECT_FALSE(ccomp.running());
EXPECT_EQ(streamLength, framesCount);
}
TEST_P(GStreamerSourceTest, TimestampsTest)
{
std::string pipeline;
std::size_t streamLength { };
std::tie(pipeline, std::ignore, streamLength) = GetParam();
// Graph declaration:
cv::GMat in;
cv::GMat copied = cv::gapi::copy(in);
cv::GOpaque<int64_t> outId = cv::gapi::streaming::seq_id(copied);
cv::GOpaque<int64_t> outTs = cv::gapi::streaming::timestamp(copied);
cv::GComputation c(cv::GIn(in), cv::GOut(outId, outTs));
// Graph compilation for streaming mode:
auto ccomp = c.compileStreaming();
EXPECT_TRUE(ccomp);
EXPECT_FALSE(ccomp.running());
// GStreamer streaming source configuration:
ccomp.setSource<cv::gapi::wip::GStreamerSource>(pipeline);
// Start of streaming:
ccomp.start();
EXPECT_TRUE(ccomp.running());
// Streaming - pulling of frames until the end:
int64_t seqId;
int64_t timestamp;
std::vector<int64_t> allSeqIds;
std::vector<int64_t> allTimestamps;
while (ccomp.pull(cv::gout(seqId, timestamp))) {
allSeqIds.push_back(seqId);
allTimestamps.push_back(timestamp);
}
EXPECT_FALSE(ccomp.running());
ccomp.stop();
EXPECT_FALSE(ccomp.running());
EXPECT_EQ(0L, allSeqIds.front());
EXPECT_EQ(int64_t(streamLength) - 1, allSeqIds.back());
EXPECT_EQ(streamLength, allSeqIds.size());
EXPECT_TRUE(std::is_sorted(allSeqIds.begin(), allSeqIds.end()));
EXPECT_EQ(allSeqIds.size(), std::set<int64_t>(allSeqIds.begin(), allSeqIds.end()).size());
EXPECT_EQ(streamLength, allTimestamps.size());
EXPECT_TRUE(std::is_sorted(allTimestamps.begin(), allTimestamps.end()));
}
G_TYPED_KERNEL(GGstFrameCopyToNV12, <std::tuple<cv::GMat,cv::GMat>(GFrame)>,
"org.opencv.test.gstframe_copy_to_nv12")
{
static std::tuple<GMatDesc, GMatDesc> outMeta(GFrameDesc desc) {
GMatDesc y { CV_8U, 1, desc.size, false };
GMatDesc uv { CV_8U, 2, desc.size / 2, false };
return std::make_tuple(y, uv);
}
};
GAPI_OCV_KERNEL(GOCVGstFrameCopyToNV12, GGstFrameCopyToNV12)
{
static void run(const cv::MediaFrame& in, cv::Mat& y, cv::Mat& uv)
{
auto view = in.access(cv::MediaFrame::Access::R);
cv::Mat ly(y.size(), y.type(), view.ptr[0], view.stride[0]);
cv::Mat luv(uv.size(), uv.type(), view.ptr[1], view.stride[1]);
ly.copyTo(y);
luv.copyTo(uv);
}
};
TEST_P(GStreamerSourceTest, GFrameTest)
{
std::string pipeline;
cv::Size expectedFrameSize;
std::size_t streamLength { };
std::tie(pipeline, expectedFrameSize, streamLength) = GetParam();
// Graph declaration:
cv::GFrame in;
cv::GMat copiedY, copiedUV;
std::tie(copiedY, copiedUV) = GGstFrameCopyToNV12::on(in);
cv::GComputation c(cv::GIn(in), cv::GOut(copiedY, copiedUV));
// Graph compilation for streaming mode:
auto ccomp = c.compileStreaming(cv::compile_args(cv::gapi::kernels<GOCVGstFrameCopyToNV12>()));
EXPECT_TRUE(ccomp);
EXPECT_FALSE(ccomp.running());
// GStreamer streaming source configuration:
ccomp.setSource<cv::gapi::wip::GStreamerSource>
(pipeline, cv::gapi::wip::GStreamerSource::OutputType::FRAME);
// Start of streaming:
ccomp.start();
EXPECT_TRUE(ccomp.running());
// Streaming - pulling of frames until the end:
cv::Mat y_mat, uv_mat;
EXPECT_TRUE(ccomp.pull(cv::gout(y_mat, uv_mat)));
EXPECT_TRUE(!y_mat.empty());
EXPECT_TRUE(!uv_mat.empty());
cv::Size expectedYSize = expectedFrameSize;
cv::Size expectedUVSize = expectedFrameSize / 2;
EXPECT_EQ(expectedYSize, y_mat.size());
EXPECT_EQ(expectedUVSize, uv_mat.size());
EXPECT_EQ(CV_8UC1, y_mat.type());
EXPECT_EQ(CV_8UC2, uv_mat.type());
std::size_t framesCount = 1UL;
while (ccomp.pull(cv::gout(y_mat, uv_mat))) {
EXPECT_TRUE(!y_mat.empty());
EXPECT_TRUE(!uv_mat.empty());
EXPECT_EQ(expectedYSize, y_mat.size());
EXPECT_EQ(expectedUVSize, uv_mat.size());
EXPECT_EQ(CV_8UC1, y_mat.type());
EXPECT_EQ(CV_8UC2, uv_mat.type());
framesCount++;
}
EXPECT_FALSE(ccomp.running());
ccomp.stop();
EXPECT_FALSE(ccomp.running());
EXPECT_EQ(streamLength, framesCount);
}
// FIXME: Need to launch with sudo. May be infrastructure problems.
// TODO: It is needed to add tests for streaming from native KMB camera: kmbcamsrc
// GStreamer element.
INSTANTIATE_TEST_CASE_P(CameraEmulatingPipeline, GStreamerSourceTest,
Combine(Values("videotestsrc is-live=true pattern=colors num-buffers=10 ! "
"videorate ! videoscale ! "
"video/x-raw,width=1920,height=1080,framerate=3/1 ! "
"appsink"),
Values(cv::Size(1920, 1080)),
Values(10UL)));
INSTANTIATE_TEST_CASE_P(FileEmulatingPipeline, GStreamerSourceTest,
Combine(Values("videotestsrc pattern=colors num-buffers=10 ! "
"videorate ! videoscale ! "
"video/x-raw,width=640,height=420,framerate=3/1 ! "
"appsink"),
Values(cv::Size(640, 420)),
Values(10UL)));
INSTANTIATE_TEST_CASE_P(MultipleLiveSources, GStreamerSourceTest,
Combine(Values("videotestsrc is-live=true pattern=colors num-buffers=10 ! "
"videoscale ! video/x-raw,width=1280,height=720 ! appsink "
"videotestsrc is-live=true pattern=colors num-buffers=10 ! "
"fakesink"),
Values(cv::Size(1280, 720)),
Values(10UL)));
INSTANTIATE_TEST_CASE_P(MultipleNotLiveSources, GStreamerSourceTest,
Combine(Values("videotestsrc pattern=colors num-buffers=10 ! "
"videoscale ! video/x-raw,width=1280,height=720 ! appsink "
"videotestsrc pattern=colors num-buffers=10 ! "
"fakesink"),
Values(cv::Size(1280, 720)),
Values(10UL)));
TEST(GStreamerMultiSourceSmokeTest, Test)
{
// Graph declaration:
cv::GMat in1, in2;
auto out = cv::gapi::add(in1, in2);
cv::GComputation c(cv::GIn(in1, in2), cv::GOut(out));
// Graph compilation for streaming mode:
auto ccomp = c.compileStreaming();
EXPECT_TRUE(ccomp);
EXPECT_FALSE(ccomp.running());
cv::gapi::wip::GStreamerPipeline
pipeline("videotestsrc is-live=true pattern=colors num-buffers=10 ! "
"videorate ! videoscale ! "
"video/x-raw,width=1920,height=1080,framerate=3/1 ! "
"appsink name=sink1 "
"videotestsrc is-live=true pattern=colors num-buffers=10 ! "
"videorate ! videoscale ! "
"video/x-raw,width=1920,height=1080,framerate=3/1 ! "
"appsink name=sink2");
// GStreamer streaming sources configuration:
auto src1 = pipeline.getStreamingSource("sink1");
auto src2 = pipeline.getStreamingSource("sink2");
ccomp.setSource(cv::gin(src1, src2));
// Start of streaming:
ccomp.start();
EXPECT_TRUE(ccomp.running());
// Streaming - pulling of frames until the end:
cv::Mat in_mat_gapi;
EXPECT_TRUE(ccomp.pull(cv::gout(in_mat_gapi)));
EXPECT_TRUE(!in_mat_gapi.empty());
EXPECT_EQ(CV_8UC3, in_mat_gapi.type());
while (ccomp.pull(cv::gout(in_mat_gapi))) {
EXPECT_TRUE(!in_mat_gapi.empty());
EXPECT_EQ(CV_8UC3, in_mat_gapi.type());
}
EXPECT_FALSE(ccomp.running());
ccomp.stop();
EXPECT_FALSE(ccomp.running());
}
struct GStreamerMultiSourceTest :
public TestWithParam<std::tuple<cv::GComputation, cv::gapi::wip::GStreamerSource::OutputType>>
{ };
TEST_P(GStreamerMultiSourceTest, ImageDataTest)
{
std::string pathToLeftIm = findDataFile("cv/stereomatching/datasets/tsukuba/im6.png");
std::string pathToRightIm = findDataFile("cv/stereomatching/datasets/tsukuba/im2.png");
std::string pipelineToReadImage("filesrc location=LOC ! pngdec ! videoconvert ! "
"videoscale ! video/x-raw,format=NV12 ! appsink");
cv::gapi::wip::GStreamerSource leftImageProvider(
std::regex_replace(pipelineToReadImage, std::regex("LOC"), pathToLeftIm));
cv::gapi::wip::GStreamerSource rightImageProvider(
std::regex_replace(pipelineToReadImage, std::regex("LOC"), pathToRightIm));
cv::gapi::wip::Data leftImData, rightImData;
leftImageProvider.pull(leftImData);
rightImageProvider.pull(rightImData);
cv::Mat leftRefMat = cv::util::get<cv::Mat>(leftImData);
cv::Mat rightRefMat = cv::util::get<cv::Mat>(rightImData);
// Retrieve test parameters:
std::tuple<cv::GComputation, cv::gapi::wip::GStreamerSource::OutputType> params = GetParam();
cv::GComputation extractImage = std::move(std::get<0>(params));
cv::gapi::wip::GStreamerSource::OutputType outputType = std::get<1>(params);
// Graph compilation for streaming mode:
auto compiled =
extractImage.compileStreaming();
EXPECT_TRUE(compiled);
EXPECT_FALSE(compiled.running());
cv::gapi::wip::GStreamerPipeline
pipeline(std::string("multifilesrc location=" + pathToLeftIm + " index=0 loop=true ! "
"pngdec ! videoconvert ! videoscale ! video/x-raw,format=NV12 ! "
"appsink name=sink1 ") +
std::string("multifilesrc location=" + pathToRightIm + " index=0 loop=true ! "
"pngdec ! videoconvert ! videoscale ! video/x-raw,format=NV12 ! "
"appsink name=sink2"));
// GStreamer streaming sources configuration:
auto src1 = pipeline.getStreamingSource("sink1", outputType);
auto src2 = pipeline.getStreamingSource("sink2", outputType);
compiled.setSource(cv::gin(src1, src2));
// Start of streaming:
compiled.start();
EXPECT_TRUE(compiled.running());
// Streaming - pulling of frames:
cv::Mat in_mat1, in_mat2;
std::size_t counter { }, limit { 10 };
while(compiled.pull(cv::gout(in_mat1, in_mat2)) && (counter < limit)) {
EXPECT_EQ(0, cv::norm(in_mat1, leftRefMat, cv::NORM_INF));
EXPECT_EQ(0, cv::norm(in_mat2, rightRefMat, cv::NORM_INF));
++counter;
}
compiled.stop();
EXPECT_FALSE(compiled.running());
}
INSTANTIATE_TEST_CASE_P(GStreamerMultiSourceViaGMatsTest, GStreamerMultiSourceTest,
Combine(Values(cv::GComputation([]()
{
cv::GMat in1, in2;
return cv::GComputation(cv::GIn(in1, in2),
cv::GOut(cv::gapi::copy(in1),
cv::gapi::copy(in2)));
})),
Values(cv::gapi::wip::GStreamerSource::OutputType::MAT)));
INSTANTIATE_TEST_CASE_P(GStreamerMultiSourceViaGFramesTest, GStreamerMultiSourceTest,
Combine(Values(cv::GComputation([]()
{
cv::GFrame in1, in2;
return cv::GComputation(cv::GIn(in1, in2),
cv::GOut(cv::gapi::streaming::BGR(in1),
cv::gapi::streaming::BGR(in2)));
})),
Values(cv::gapi::wip::GStreamerSource::OutputType::FRAME)));
} // namespace opencv_test
#endif // HAVE_GSTREAMER
Loading…
Cancel
Save