diff --git a/modules/gapi/CMakeLists.txt b/modules/gapi/CMakeLists.txt index 79ae30ae57..855ce27088 100644 --- a/modules/gapi/CMakeLists.txt +++ b/modules/gapi/CMakeLists.txt @@ -47,12 +47,14 @@ file(GLOB gapi_ext_hdrs "${CMAKE_CURRENT_LIST_DIR}/include/opencv2/${name}/infer/*.hpp" "${CMAKE_CURRENT_LIST_DIR}/include/opencv2/${name}/ocl/*.hpp" "${CMAKE_CURRENT_LIST_DIR}/include/opencv2/${name}/own/*.hpp" + "${CMAKE_CURRENT_LIST_DIR}/include/opencv2/${name}/plaidml/*.hpp" + "${CMAKE_CURRENT_LIST_DIR}/include/opencv2/${name}/python/*.hpp" "${CMAKE_CURRENT_LIST_DIR}/include/opencv2/${name}/render/*.hpp" "${CMAKE_CURRENT_LIST_DIR}/include/opencv2/${name}/s11n/*.hpp" "${CMAKE_CURRENT_LIST_DIR}/include/opencv2/${name}/streaming/*.hpp" - "${CMAKE_CURRENT_LIST_DIR}/include/opencv2/${name}/plaidml/*.hpp" + "${CMAKE_CURRENT_LIST_DIR}/include/opencv2/${name}/streaming/gstreamer/*.hpp" + "${CMAKE_CURRENT_LIST_DIR}/include/opencv2/${name}/streaming/onevpl/*.hpp" "${CMAKE_CURRENT_LIST_DIR}/include/opencv2/${name}/util/*.hpp" - "${CMAKE_CURRENT_LIST_DIR}/include/opencv2/${name}/python/*.hpp" ) set(gapi_srcs @@ -163,7 +165,7 @@ set(gapi_srcs src/backends/ie/bindings_ie.cpp src/backends/python/gpythonbackend.cpp - # Streaming source + # OpenVPL Streaming source src/streaming/onevpl/source.cpp src/streaming/onevpl/source_priv.cpp src/streaming/onevpl/file_data_provider.cpp @@ -186,6 +188,14 @@ set(gapi_srcs src/streaming/onevpl/cfg_param_device_selector.cpp src/streaming/onevpl/device_selector_interface.cpp + # GStreamer Streaming source + src/streaming/gstreamer/gstreamer_pipeline_facade.cpp + src/streaming/gstreamer/gstreamerpipeline.cpp + src/streaming/gstreamer/gstreamersource.cpp + src/streaming/gstreamer/gstreamer_buffer_utils.cpp + src/streaming/gstreamer/gstreamer_media_adapter.cpp + src/streaming/gstreamer/gstreamerenv.cpp + # Utils (ITT tracing) src/utils/itt.cpp ) @@ -283,6 +293,15 @@ if(HAVE_GAPI_ONEVPL) endif() endif() +if(HAVE_GSTREAMER) + if(TARGET opencv_test_gapi) + ocv_target_compile_definitions(opencv_test_gapi PRIVATE -DHAVE_GSTREAMER) + ocv_target_link_libraries(opencv_test_gapi PRIVATE ocv.3rdparty.gstreamer) + endif() + ocv_target_compile_definitions(${the_module} PRIVATE -DHAVE_GSTREAMER) + ocv_target_link_libraries(${the_module} PRIVATE ocv.3rdparty.gstreamer) +endif() + if(WIN32) # Required for htonl/ntohl on Windows ocv_target_link_libraries(${the_module} PRIVATE wsock32 ws2_32) diff --git a/modules/gapi/include/opencv2/gapi/streaming/gstreamer/gstreamerpipeline.hpp b/modules/gapi/include/opencv2/gapi/streaming/gstreamer/gstreamerpipeline.hpp new file mode 100644 index 0000000000..83afc99393 --- /dev/null +++ b/modules/gapi/include/opencv2/gapi/streaming/gstreamer/gstreamerpipeline.hpp @@ -0,0 +1,47 @@ +// This file is part of OpenCV project. +// It is subject to the license terms in the LICENSE file found in the top-level directory +// of this distribution and at http://opencv.org/license.html. +// +// Copyright (C) 2021 Intel Corporation + +#ifndef OPENCV_GAPI_STREAMING_GSTREAMER_GSTREAMERPIPELINE_HPP +#define OPENCV_GAPI_STREAMING_GSTREAMER_GSTREAMERPIPELINE_HPP + +#include +#include + +#include +#include +#include + +namespace cv { +namespace gapi { +namespace wip { +namespace gst { + +class GAPI_EXPORTS GStreamerPipeline +{ +public: + class Priv; + + explicit GStreamerPipeline(const std::string& pipeline); + IStreamSource::Ptr getStreamingSource(const std::string& appsinkName, + const GStreamerSource::OutputType outputType = + GStreamerSource::OutputType::MAT); + virtual ~GStreamerPipeline(); + +protected: + explicit GStreamerPipeline(std::unique_ptr priv); + + std::unique_ptr m_priv; +}; + +} // namespace gst + +using GStreamerPipeline = gst::GStreamerPipeline; + +} // namespace wip +} // namespace gapi +} // namespace cv + +#endif // OPENCV_GAPI_STREAMING_GSTREAMER_GSTREAMERPIPELINE_HPP diff --git a/modules/gapi/include/opencv2/gapi/streaming/gstreamer/gstreamersource.hpp b/modules/gapi/include/opencv2/gapi/streaming/gstreamer/gstreamersource.hpp new file mode 100644 index 0000000000..b81bad31b8 --- /dev/null +++ b/modules/gapi/include/opencv2/gapi/streaming/gstreamer/gstreamersource.hpp @@ -0,0 +1,89 @@ +// This file is part of OpenCV project. +// It is subject to the license terms in the LICENSE file found in the top-level directory +// of this distribution and at http://opencv.org/license.html. +// +// Copyright (C) 2021 Intel Corporation + +#ifndef OPENCV_GAPI_STREAMING_GSTREAMER_GSTREAMERSOURCE_HPP +#define OPENCV_GAPI_STREAMING_GSTREAMER_GSTREAMERSOURCE_HPP + +#include +#include + +#include + +namespace cv { +namespace gapi { +namespace wip { +namespace gst { + +/** + * @brief OpenCV's GStreamer streaming source. + * Streams cv::Mat-s/cv::MediaFrame from passed GStreamer pipeline. + * + * This class implements IStreamSource interface. + * + * To create GStreamerSource instance you need to pass 'pipeline' and, optionally, 'outputType' + * arguments into constructor. + * 'pipeline' should represent GStreamer pipeline in form of textual description. + * Almost any custom pipeline is supported which can be successfully ran via gst-launch. + * The only two limitations are: + * - there should be __one__ appsink element in the pipeline to pass data to OpenCV app. + * Pipeline can actually contain many sink elements, but it must have one and only one + * appsink among them. + * + * - data passed to appsink should be video-frame in NV12 format. + * + * 'outputType' is used to select type of output data to produce: 'cv::MediaFrame' or 'cv::Mat'. + * To produce 'cv::MediaFrame'-s you need to pass 'GStreamerSource::OutputType::FRAME' and, + * correspondingly, 'GStreamerSource::OutputType::MAT' to produce 'cv::Mat'-s. + * Please note, that in the last case, output 'cv::Mat' will be of BGR format, internal conversion + * from NV12 GStreamer data will happen. + * Default value for 'outputType' is 'GStreamerSource::OutputType::MAT'. + * + * @note Stream sources are passed to G-API via shared pointers, so please use gapi::make_src<> + * to create objects and ptr() to pass a GStreamerSource to cv::gin(). + * + * @note You need to build OpenCV with GStreamer support to use this class. + */ + +class GStreamerPipelineFacade; + +class GAPI_EXPORTS GStreamerSource : public IStreamSource +{ +public: + class Priv; + + // Indicates what type of data should be produced by GStreamerSource: cv::MediaFrame or cv::Mat + enum class OutputType { + FRAME, + MAT + }; + + GStreamerSource(const std::string& pipeline, + const GStreamerSource::OutputType outputType = + GStreamerSource::OutputType::MAT); + GStreamerSource(std::shared_ptr pipeline, + const std::string& appsinkName, + const GStreamerSource::OutputType outputType = + GStreamerSource::OutputType::MAT); + + bool pull(cv::gapi::wip::Data& data) override; + GMetaArg descr_of() const override; + ~GStreamerSource() override; + +protected: + explicit GStreamerSource(std::unique_ptr priv); + + std::unique_ptr m_priv; +}; + +} // namespace gst + +using GStreamerSource = gst::GStreamerSource; + +} // namespace wip +} // namespace gapi +} // namespace cv + +#endif // OPENCV_GAPI_STREAMING_GSTREAMER_GSTREAMERSOURCE_HPP diff --git a/modules/gapi/include/opencv2/gapi/streaming/onevpl/device_selector_interface.hpp b/modules/gapi/include/opencv2/gapi/streaming/onevpl/device_selector_interface.hpp index ca19849d72..04f8cae02a 100644 --- a/modules/gapi/include/opencv2/gapi/streaming/onevpl/device_selector_interface.hpp +++ b/modules/gapi/include/opencv2/gapi/streaming/onevpl/device_selector_interface.hpp @@ -19,7 +19,7 @@ namespace gapi { namespace wip { namespace onevpl { -enum class AccelType : uint8_t { +enum class AccelType: uint8_t { HOST, DX11, diff --git a/modules/gapi/src/streaming/gstreamer/gstreamer_buffer_utils.cpp b/modules/gapi/src/streaming/gstreamer/gstreamer_buffer_utils.cpp new file mode 100644 index 0000000000..227013105a --- /dev/null +++ b/modules/gapi/src/streaming/gstreamer/gstreamer_buffer_utils.cpp @@ -0,0 +1,27 @@ +// This file is part of OpenCV project. +// It is subject to the license terms in the LICENSE file found in the top-level directory +// of this distribution and at http://opencv.org/license.html. +// +// Copyright (C) 2021 Intel Corporation + +#include "gstreamer_buffer_utils.hpp" +#include "gstreamerptr.hpp" +#include + +#ifdef HAVE_GSTREAMER +namespace cv { +namespace gapi { +namespace wip { +namespace gstreamer_utils { + +void mapBufferToFrame(GstBuffer& buffer, GstVideoInfo& info, GstVideoFrame& frame, + GstMapFlags mapFlags) { + bool mapped = gst_video_frame_map(&frame, &info, &buffer, mapFlags); + GAPI_Assert(mapped && "Failed to map GStreamer buffer to system memory as video-frame!"); +} + +} // namespace gstreamer_utils +} // namespace wip +} // namespace gapi +} // namespace cv +#endif // HAVE_GSTREAMER diff --git a/modules/gapi/src/streaming/gstreamer/gstreamer_buffer_utils.hpp b/modules/gapi/src/streaming/gstreamer/gstreamer_buffer_utils.hpp new file mode 100644 index 0000000000..3e6f908e0f --- /dev/null +++ b/modules/gapi/src/streaming/gstreamer/gstreamer_buffer_utils.hpp @@ -0,0 +1,27 @@ +// This file is part of OpenCV project. +// It is subject to the license terms in the LICENSE file found in the top-level directory +// of this distribution and at http://opencv.org/license.html. +// +// Copyright (C) 2021 Intel Corporation + +#ifndef OPENCV_GAPI_STREAMING_GSTREAMER_GSTREAMER_BUFFER_UTILS_HPP +#define OPENCV_GAPI_STREAMING_GSTREAMER_GSTREAMER_BUFFER_UTILS_HPP + +#ifdef HAVE_GSTREAMER +#include +#include + +namespace cv { +namespace gapi { +namespace wip { +namespace gstreamer_utils { + +void mapBufferToFrame(GstBuffer& buffer, GstVideoInfo& info, GstVideoFrame& frame, + GstMapFlags map_flags); + +} // namespace gstreamer_utils +} // namespace wip +} // namespace gapi +} // namespace cv +#endif // HAVE_GSTREAMER +#endif // OPENCV_GAPI_STREAMING_GSTREAMER_GSTREAMER_BUFFER_UTILS_HPP diff --git a/modules/gapi/src/streaming/gstreamer/gstreamer_media_adapter.cpp b/modules/gapi/src/streaming/gstreamer/gstreamer_media_adapter.cpp new file mode 100644 index 0000000000..9019289ae4 --- /dev/null +++ b/modules/gapi/src/streaming/gstreamer/gstreamer_media_adapter.cpp @@ -0,0 +1,122 @@ +// This file is part of OpenCV project. +// It is subject to the license terms in the LICENSE file found in the top-level directory +// of this distribution and at http://opencv.org/license.html. +// +// Copyright (C) 2021 Intel Corporation + +#include "gstreamer_media_adapter.hpp" +#include "gstreamer_buffer_utils.hpp" + +#ifdef HAVE_GSTREAMER +namespace cv { +namespace gapi { +namespace wip { +namespace gst { + +GStreamerMediaAdapter::GStreamerMediaAdapter(const cv::GFrameDesc& frameDesc, + GstVideoInfo* videoInfo, + GstBuffer* buffer) : + m_frameDesc(frameDesc), + m_videoInfo(gst_video_info_copy(videoInfo)), + m_buffer(gst_buffer_ref(buffer)), + m_isMapped(false) +{ +#if GST_VERSION_MINOR >= 10 + // Check that GstBuffer has mono-view, so we can retrieve only one video-meta + GAPI_Assert((gst_buffer_get_flags(m_buffer) & GST_VIDEO_BUFFER_FLAG_MULTIPLE_VIEW) == 0); +#endif // GST_VERSION_MINOR >= 10 + + GstVideoMeta* videoMeta = gst_buffer_get_video_meta(m_buffer); + if (videoMeta != nullptr) { + m_strides = { videoMeta->stride[0], videoMeta->stride[1] }; + m_offsets = { videoMeta->offset[0], videoMeta->offset[1] }; + } else { + m_strides = { GST_VIDEO_INFO_PLANE_STRIDE(m_videoInfo.get(), 0), + GST_VIDEO_INFO_PLANE_STRIDE(m_videoInfo.get(), 1) }; + m_offsets = { GST_VIDEO_INFO_PLANE_OFFSET(m_videoInfo.get(), 0), + GST_VIDEO_INFO_PLANE_OFFSET(m_videoInfo.get(), 1) }; + } +} + +GStreamerMediaAdapter::~GStreamerMediaAdapter() { + if (m_isMapped.load(std::memory_order_acquire)) { + gst_video_frame_unmap(&m_videoFrame); + m_isMapped.store(false, std::memory_order_release); + m_mappedForWrite.store(false); + } +} + +cv::GFrameDesc GStreamerMediaAdapter::meta() const { + return m_frameDesc; +} + +cv::MediaFrame::View GStreamerMediaAdapter::access(cv::MediaFrame::Access access) { + GAPI_Assert(access == cv::MediaFrame::Access::R || + access == cv::MediaFrame::Access::W); + static std::atomic thread_counters { }; + ++thread_counters; + + // NOTE: Framework guarantees that there should be no parallel accesses to the frame + // memory if is accessing for write. + if (access == cv::MediaFrame::Access::W && !m_mappedForWrite.load(std::memory_order_acquire)) { + GAPI_Assert(thread_counters > 1 && + "Multiple access to view during mapping for write detected!"); + gst_video_frame_unmap(&m_videoFrame); + m_isMapped.store(false); + } + + if (!m_isMapped.load(std::memory_order_acquire)) { + + std::lock_guard lock(m_mutex); + + if(!m_isMapped.load(std::memory_order_relaxed)) { + + GAPI_Assert(GST_VIDEO_INFO_N_PLANES(m_videoInfo.get()) == 2); + GAPI_Assert(GST_VIDEO_INFO_FORMAT(m_videoInfo.get()) == GST_VIDEO_FORMAT_NV12); + + // TODO: Use RAII for map/unmap + if (access == cv::MediaFrame::Access::W) { + gstreamer_utils::mapBufferToFrame(*m_buffer, *m_videoInfo, m_videoFrame, + GST_MAP_WRITE); + m_mappedForWrite.store(true, std::memory_order_release); + } else { + gstreamer_utils::mapBufferToFrame(*m_buffer, *m_videoInfo, m_videoFrame, + GST_MAP_READ); + } + + GAPI_Assert(GST_VIDEO_FRAME_PLANE_STRIDE(&m_videoFrame, 0) == m_strides[0]); + GAPI_Assert(GST_VIDEO_FRAME_PLANE_STRIDE(&m_videoFrame, 1) == m_strides[1]); + GAPI_Assert(GST_VIDEO_FRAME_PLANE_OFFSET(&m_videoFrame, 0) == m_offsets[0]); + GAPI_Assert(GST_VIDEO_FRAME_PLANE_OFFSET(&m_videoFrame, 1) == m_offsets[1]); + + m_isMapped.store(true, std::memory_order_release); + } + } + + cv::MediaFrame::View::Ptrs ps { + static_cast(GST_VIDEO_FRAME_PLANE_DATA(&m_videoFrame, 0)) + m_offsets[0], // Y-plane + static_cast(GST_VIDEO_FRAME_PLANE_DATA(&m_videoFrame, 0)) + m_offsets[1], // UV-plane + nullptr, + nullptr + }; + + cv::MediaFrame::View::Strides ss = { + static_cast(m_strides[0]), // Y-plane stride + static_cast(m_strides[1]), // UV-plane stride + 0u, + 0u + }; + + --thread_counters; + return cv::MediaFrame::View(std::move(ps), std::move(ss)); +} + +cv::util::any GStreamerMediaAdapter::blobParams() const { + GAPI_Assert(false && "No implementation for GStreamerMediaAdapter::blobParams()"); +} + +} // namespace gst +} // namespace wip +} // namespace gapi +} // namespace cv +#endif // HAVE_GSTREAMER diff --git a/modules/gapi/src/streaming/gstreamer/gstreamer_media_adapter.hpp b/modules/gapi/src/streaming/gstreamer/gstreamer_media_adapter.hpp new file mode 100644 index 0000000000..4c5c137b0d --- /dev/null +++ b/modules/gapi/src/streaming/gstreamer/gstreamer_media_adapter.hpp @@ -0,0 +1,63 @@ +// This file is part of OpenCV project. +// It is subject to the license terms in the LICENSE file found in the top-level directory +// of this distribution and at http://opencv.org/license.html. +// +// Copyright (C) 2021 Intel Corporation + +#ifndef OPENCV_GAPI_STREAMING_GSTREAMER_GSTREAMER_MEDIA_ADAPTER_HPP +#define OPENCV_GAPI_STREAMING_GSTREAMER_GSTREAMER_MEDIA_ADAPTER_HPP + +// #include +// #include + +#include "gstreamerptr.hpp" +#include + +#include +#include + +#ifdef HAVE_GSTREAMER +#include +#include + +namespace cv { +namespace gapi { +namespace wip { +namespace gst { + +class GStreamerMediaAdapter : public cv::MediaFrame::IAdapter { +public: + explicit GStreamerMediaAdapter(const cv::GFrameDesc& frameDesc, + GstVideoInfo* videoInfo, + GstBuffer* buffer); + + ~GStreamerMediaAdapter() override; + + virtual cv::GFrameDesc meta() const override; + + cv::MediaFrame::View access(cv::MediaFrame::Access access) override; + + cv::util::any blobParams() const override; + +protected: + cv::GFrameDesc m_frameDesc; + + GStreamerPtr m_videoInfo; + GStreamerPtr m_buffer; + + std::vector m_strides; + std::vector m_offsets; + + GstVideoFrame m_videoFrame; + + std::atomic m_isMapped; + std::atomic m_mappedForWrite; + std::mutex m_mutex; +}; + +} // namespace gst +} // namespace wip +} // namespace gapi +} // namespace cv +#endif // HAVE_GSTREAMER +#endif // OPENCV_GAPI_STREAMING_GSTREAMER_GSTREAMER_MEDIA_ADAPTER_HPP diff --git a/modules/gapi/src/streaming/gstreamer/gstreamer_pipeline_facade.cpp b/modules/gapi/src/streaming/gstreamer/gstreamer_pipeline_facade.cpp new file mode 100644 index 0000000000..cd782537ca --- /dev/null +++ b/modules/gapi/src/streaming/gstreamer/gstreamer_pipeline_facade.cpp @@ -0,0 +1,314 @@ +// This file is part of OpenCV project. +// It is subject to the license terms in the LICENSE file found in the top-level directory +// of this distribution and at http://opencv.org/license.html. +// +// Copyright (C) 2021 Intel Corporation + +#include "gstreamerenv.hpp" + +#include "gstreamer_pipeline_facade.hpp" + +#include + +#include + +#include + +#ifdef HAVE_GSTREAMER +#include +#include +#include +#include + +namespace cv { +namespace gapi { +namespace wip { +namespace gst { + +GStreamerPipelineFacade::GStreamerPipelineFacade(): + m_isPrerolled(false), + m_isPlaying(false) + { } + +GStreamerPipelineFacade::GStreamerPipelineFacade(const std::string& pipelineDesc): + GStreamerPipelineFacade() +{ + m_pipelineDesc = pipelineDesc; + + // Initialize GStreamer library: + GStreamerEnv::init(); + + // Create GStreamer pipeline: + GError* error = NULL; + // gst_parse_launch [transfer floating] + m_pipeline = GST_ELEMENT(g_object_ref_sink( + gst_parse_launch(m_pipelineDesc.c_str(), &error))); + + GStreamerPtr err(error); + + if (err) + { + cv::util::throw_error( + std::runtime_error("Error in parsing pipeline: " + std::string(err->message))); + } +} + +// The destructors are noexcept by default (since C++11). +GStreamerPipelineFacade::~GStreamerPipelineFacade() +{ + // There is no mutex acquisition here, because we assume that no one will call this method + // directly. + + // Destructor may be called on empty GStreamerSource object in case if + // exception is thrown during construction. + if (m_pipeline && GST_IS_ELEMENT(m_pipeline.get())) + { + try + { + setState(GST_STATE_NULL); + } + catch(...) + { + GAPI_LOG_WARNING(NULL, "Unable to stop pipeline in destructor.\n"); + } + + m_pipeline.release(); + } +} + +std::vector GStreamerPipelineFacade::getElementsByFactoryName( + const std::string& factoryName) +{ + std::vector outElements = getElements( + [&factoryName](GstElement* element) { + GStreamerPtr name( + gst_object_get_name(GST_OBJECT(gst_element_get_factory(element)))); + return name && (0 == strcmp(name, factoryName.c_str())); + }); + + return outElements; +} + +GstElement* GStreamerPipelineFacade::getElementByName(const std::string& elementName) +{ + std::vector outElements = getElements( + [&elementName](GstElement* element) { + GStreamerPtr name(gst_element_get_name(element)); + return name && (0 == strcmp(name, elementName.c_str())); + }); + + if (outElements.empty()) + { + return nullptr; + } + else + { + GAPI_Assert(1ul == outElements.size()); + return outElements[0]; + } +} + +void GStreamerPipelineFacade::completePreroll() { + // FIXME: If there are multiple sources in pipeline and one of them is live, then pipeline + // will return GST_STATE_CHANGE_NO_PREROLL while pipeline pausing. + // But appsink may not be connected to this live source and only to anothers, + // not-live ones. So, it is not required to start the playback for appsink to complete + // the preroll. + // Starting of playback for the not-live sources before the first frame pull will lead + // to loosing of some amount of frames and pulling of the first frame can return frame + // which is far from the first. + // + // Need to handle this case or forbid to mix multiples sources of different + // categories(live, not-live) in the pipeline explicitly(assert). + + if (!m_isPrerolled.load(std::memory_order_acquire)) + { + std::lock_guard lock(m_stateChangeMutex); + + if(!m_isPrerolled.load(std::memory_order_relaxed)) + { + PipelineState state = queryState(); + + // Only move forward in the pipeline's state machine + GAPI_Assert(state.current != GST_STATE_PLAYING); + + GAPI_Assert(state.pending == GST_STATE_VOID_PENDING); + GstStateChangeReturn status = gst_element_set_state(m_pipeline, GST_STATE_PAUSED); + checkBusMessages(); + if (status == GST_STATE_CHANGE_NO_PREROLL) + { + status = gst_element_set_state(m_pipeline, GST_STATE_PLAYING); + m_isPlaying.store(true); + } + verifyStateChange(status); + + m_isPrerolled.store(true, std::memory_order_release); + } + } +} + +void GStreamerPipelineFacade::play() +{ + if (!m_isPlaying.load(std::memory_order_acquire)) + { + std::lock_guard lock(m_stateChangeMutex); + + if (!m_isPlaying.load(std::memory_order_relaxed)) + { + setState(GST_STATE_PLAYING); + m_isPlaying.store(true, std::memory_order_release); + m_isPrerolled.store(true); + } + } +} + +bool GStreamerPipelineFacade::isPlaying() { + return m_isPlaying.load(); +} + +std::vector GStreamerPipelineFacade::getElements( + std::function comparator) +{ + std::vector outElements; + GStreamerPtr it(gst_bin_iterate_elements(GST_BIN(m_pipeline.get()))); + GValue value = G_VALUE_INIT; + + GstIteratorResult status = gst_iterator_next(it, &value); + while (status != GST_ITERATOR_DONE && status != GST_ITERATOR_ERROR) + { + if (status == GST_ITERATOR_OK) + { + GstElement* element = GST_ELEMENT(g_value_get_object(&value)); + if (comparator(element)) + { + outElements.push_back(GST_ELEMENT(element)); + } + + g_value_unset(&value); + } + else if (status == GST_ITERATOR_RESYNC) + { + gst_iterator_resync(it); + } + + status = gst_iterator_next(it, &value); + } + + return outElements; +} + +PipelineState GStreamerPipelineFacade::queryState() +{ + GAPI_Assert(m_pipeline && GST_IS_ELEMENT(m_pipeline.get()) && + "GStreamer pipeline has not been created!"); + + PipelineState state; + GstClockTime timeout = 5 * GST_SECOND; + gst_element_get_state(m_pipeline, &state.current, &state.pending, timeout); + + return state; +} + +void GStreamerPipelineFacade::setState(GstState newState) +{ + PipelineState state = queryState(); + GAPI_Assert(state.pending == GST_STATE_VOID_PENDING); + + if (state.current != newState) + { + GstStateChangeReturn status = gst_element_set_state(m_pipeline, newState); + verifyStateChange(status); + } +} + +void GStreamerPipelineFacade::verifyStateChange(GstStateChangeReturn status) +{ + if (status == GST_STATE_CHANGE_ASYNC) + { + // Wait for status update. + status = gst_element_get_state(m_pipeline, NULL, NULL, GST_CLOCK_TIME_NONE); + } + + if (status == GST_STATE_CHANGE_FAILURE) + { + checkBusMessages(); + PipelineState state = queryState(); + const gchar* currentState = gst_element_state_get_name(state.current); + const gchar* pendingState = gst_element_state_get_name(state.pending); + cv::util::throw_error( + std::runtime_error(std::string("Unable to change pipeline state from ") + + std::string(currentState) + std::string(" to ") + + std::string(pendingState))); + } + + checkBusMessages(); +} + +// Handles GStreamer bus messages. +// For debugging purposes. +void GStreamerPipelineFacade::checkBusMessages() const +{ + GStreamerPtr bus(gst_element_get_bus(m_pipeline)); + + while (gst_bus_have_pending(bus)) + { + GStreamerPtr msg(gst_bus_pop(bus)); + if (!msg || !GST_IS_MESSAGE(msg.get())) + { + continue; + } + + if (gst_is_missing_plugin_message(msg)) + { + GStreamerPtr descr(gst_missing_plugin_message_get_description(msg)); + cv::util::throw_error( + std::runtime_error("Your GStreamer installation is missing a required plugin!" + "Details: " + std::string(descr))); + } + else + { + switch (GST_MESSAGE_TYPE(msg)) + { + case GST_MESSAGE_STATE_CHANGED: + { + if (GST_MESSAGE_SRC(msg.get()) == GST_OBJECT(m_pipeline.get())) + { + GstState oldState = GST_STATE_NULL, + newState = GST_STATE_NULL; + gst_message_parse_state_changed(msg, &oldState, &newState, NULL); + const gchar* oldStateName = gst_element_state_get_name(oldState); + const gchar* newStateName = gst_element_state_get_name(newState); + GAPI_LOG_INFO(NULL, "Pipeline state changed from " << oldStateName << " to " + << newStateName); + } + break; + } + case GST_MESSAGE_ERROR: + { + GError* error = NULL; + gchar* debug = NULL; + + gst_message_parse_error(msg, &error, &debug); // transfer full for out args + + GStreamerPtr err(error); + GStreamerPtr deb(debug); + + GStreamerPtr name(gst_element_get_name(GST_MESSAGE_SRC(msg.get()))); + GAPI_LOG_WARNING(NULL, "Embedded video playback halted; module " << name.get() + << " reported: " << err->message); + GAPI_LOG_WARNING(NULL, "GStreamer debug: " << deb); + + break; + } + default: + break; + } + } + } +} + +} // namespace gst +} // namespace wip +} // namespace gapi +} // namespace cv +#endif // HAVE_GSTREAMER diff --git a/modules/gapi/src/streaming/gstreamer/gstreamer_pipeline_facade.hpp b/modules/gapi/src/streaming/gstreamer/gstreamer_pipeline_facade.hpp new file mode 100644 index 0000000000..4ebd8f0197 --- /dev/null +++ b/modules/gapi/src/streaming/gstreamer/gstreamer_pipeline_facade.hpp @@ -0,0 +1,89 @@ +// This file is part of OpenCV project. +// It is subject to the license terms in the LICENSE file found in the top-level directory +// of this distribution and at http://opencv.org/license.html. +// +// Copyright (C) 2021 Intel Corporation + +#ifndef OPENCV_GAPI_STREAMING_GSTREAMER_GSTREAMER_PIPELINE_FACADE_HPP +#define OPENCV_GAPI_STREAMING_GSTREAMER_GSTREAMER_PIPELINE_FACADE_HPP + +#include "gstreamerptr.hpp" + +#include +#include +#include + +#ifdef HAVE_GSTREAMER +#include + +namespace cv { +namespace gapi { +namespace wip { +namespace gst { + +// GAPI_EXPORTS here is only for testing purposes. +struct GAPI_EXPORTS PipelineState +{ + GstState current = GST_STATE_NULL; + GstState pending = GST_STATE_NULL; +}; + +// This class represents facade for pipeline GstElement and related functions. +// Class restricts pipeline to only move forward in its state machine: +// NULL -> READY -> PAUSED -> PLAYING. +// There is no possibility to pause and resume pipeline, it can be only once played. +// GAPI_EXPORTS here is only for testing purposes. +class GAPI_EXPORTS GStreamerPipelineFacade +{ +public: + // Strong exception guarantee. + explicit GStreamerPipelineFacade(const std::string& pipeline); + + // The destructors are noexcept by default. (since C++11) + ~GStreamerPipelineFacade(); + + // Elements getters are not guarded with mutexes because elements order is not supposed + // to change in the pipeline. + std::vector getElementsByFactoryName(const std::string& factoryName); + GstElement* getElementByName(const std::string& elementName); + + // Pipeline state modifiers: can be called only once, MT-safe, mutually exclusive. + void completePreroll(); + void play(); + + // Pipeline state checker: MT-safe. + bool isPlaying(); + +private: + std::string m_pipelineDesc; + + GStreamerPtr m_pipeline; + + std::atomic m_isPrerolled; + std::atomic m_isPlaying; + // Mutex to guard state(paused, playing) from changes from multiple threads + std::mutex m_stateChangeMutex; + +private: + // This constructor is needed only to make public constructor as delegating constructor + // and allow it to throw exceptions. + GStreamerPipelineFacade(); + + // Elements getter is not guarded with mutex because elements order is not supposed + // to change in the pipeline. + std::vector getElements(std::function comparator); + + // Getters, modifiers, verifiers are not MT-safe, because they are called from + // MT-safe mutually exclusive public functions. + PipelineState queryState(); + void setState(GstState state); + void verifyStateChange(GstStateChangeReturn status); + void checkBusMessages() const; +}; + +} // namespace gst +} // namespace wip +} // namespace gapi +} // namespace cv +#endif // HAVE_GSTREAMER +#endif // OPENCV_GAPI_STREAMING_GSTREAMER_GSTREAMER_PIPELINE_FACADE_HPP diff --git a/modules/gapi/src/streaming/gstreamer/gstreamerenv.cpp b/modules/gapi/src/streaming/gstreamer/gstreamerenv.cpp new file mode 100644 index 0000000000..138589b9a6 --- /dev/null +++ b/modules/gapi/src/streaming/gstreamer/gstreamerenv.cpp @@ -0,0 +1,90 @@ +// This file is part of OpenCV project. +// It is subject to the license terms in the LICENSE file found in the top-level directory +// of this distribution and at http://opencv.org/license.html. +// +// Copyright (C) 2021 Intel Corporation + +#include "gstreamerenv.hpp" +#include "gstreamerptr.hpp" + +#ifdef HAVE_GSTREAMER +#include +#endif // HAVE_GSTREAMER + +namespace cv { +namespace gapi { +namespace wip { +namespace gst { + +#ifdef HAVE_GSTREAMER + +const GStreamerEnv& GStreamerEnv::init() +{ + static GStreamerEnv gInit; + return gInit; +} + +GStreamerEnv::GStreamerEnv() +{ + if (!gst_is_initialized()) + { + GError* error = NULL; + gst_init_check(NULL, NULL, &error); + + GStreamerPtr err(error); + + if (err) + { + cv::util::throw_error( + std::runtime_error(std::string("GStreamer initializaton error! Details: ") + + err->message)); + } + } + + // FIXME: GStreamer libs which have same MAJOR and MINOR versions are API and ABI compatible. + // If GStreamer runtime MAJOR version differs from the version the application was + // compiled with, will it fail on the linkage stage? If so, the code below isn't needed. + guint major, minor, micro, nano; + gst_version(&major, &minor, µ, &nano); + if (GST_VERSION_MAJOR != major) + { + cv::util::throw_error( + std::runtime_error(std::string("Incompatible GStreamer version: compiled with ") + + std::to_string(GST_VERSION_MAJOR) + '.' + + std::to_string(GST_VERSION_MINOR) + '.' + + std::to_string(GST_VERSION_MICRO) + '.' + + std::to_string(GST_VERSION_NANO) + + ", but runtime has " + + std::to_string(major) + '.' + std::to_string(minor) + '.' + + std::to_string(micro) + '.' + std::to_string(nano) + '.')); + } +} + +GStreamerEnv::~GStreamerEnv() +{ + gst_deinit(); +} + +#else // HAVE_GSTREAMER + +const GStreamerEnv& GStreamerEnv::init() +{ + GAPI_Assert(false && "Built without GStreamer support!"); +} + +GStreamerEnv::GStreamerEnv() +{ + GAPI_Assert(false && "Built without GStreamer support!"); +} + +GStreamerEnv::~GStreamerEnv() +{ + // No need an assert here. The assert raise C4722 warning. Constructor have already got assert. +} + +#endif // HAVE_GSTREAMER + +} // namespace gst +} // namespace wip +} // namespace gapi +} // namespace cv diff --git a/modules/gapi/src/streaming/gstreamer/gstreamerenv.hpp b/modules/gapi/src/streaming/gstreamer/gstreamerenv.hpp new file mode 100644 index 0000000000..04f7f0c16b --- /dev/null +++ b/modules/gapi/src/streaming/gstreamer/gstreamerenv.hpp @@ -0,0 +1,37 @@ +// This file is part of OpenCV project. +// It is subject to the license terms in the LICENSE file found in the top-level directory +// of this distribution and at http://opencv.org/license.html. +// +// Copyright (C) 2021 Intel Corporation + +#ifndef OPENCV_GAPI_STREAMING_GSTREAMER_GSTREAMERENV_HPP +#define OPENCV_GAPI_STREAMING_GSTREAMER_GSTREAMERENV_HPP + +namespace cv { +namespace gapi { +namespace wip { +namespace gst { + +/*! + * \brief The GStreamerEnv class + * Initializes gstreamer once in the whole process + * + * + * @note You need to build OpenCV with GStreamer support to use this class. + */ +class GStreamerEnv +{ +public: + static const GStreamerEnv& init(); + +private: + GStreamerEnv(); + ~GStreamerEnv(); +}; + +} // namespace gst +} // namespace wip +} // namespace gapi +} // namespace cv + +#endif // OPENCV_GAPI_STREAMING_GSTREAMER_GSTREAMERENV_HPP diff --git a/modules/gapi/src/streaming/gstreamer/gstreamerpipeline.cpp b/modules/gapi/src/streaming/gstreamer/gstreamerpipeline.cpp new file mode 100644 index 0000000000..6687076c7e --- /dev/null +++ b/modules/gapi/src/streaming/gstreamer/gstreamerpipeline.cpp @@ -0,0 +1,112 @@ +// This file is part of OpenCV project. +// It is subject to the license terms in the LICENSE file found in the top-level directory +// of this distribution and at http://opencv.org/license.html. +// +// Copyright (C) 2021 Intel Corporation + +#include "gstreamer_pipeline_facade.hpp" +#include "gstreamerpipeline_priv.hpp" +#include + +#ifdef HAVE_GSTREAMER +#include +#endif // HAVE_GSTREAMER + +namespace cv { +namespace gapi { +namespace wip { +namespace gst { + +#ifdef HAVE_GSTREAMER + +GStreamerPipeline::Priv::Priv(const std::string& pipelineDesc): + m_pipeline(std::make_shared(pipelineDesc)) +{ + std::vector appsinks = + m_pipeline->getElementsByFactoryName("appsink"); + + for (std::size_t i = 0ul; i < appsinks.size(); ++i) + { + auto* appsink = appsinks[i]; + GAPI_Assert(appsink != nullptr); + GStreamerPtr name(gst_element_get_name(appsink)); + auto result = m_appsinkNamesToUse.insert({ name.get(), true /* free */ }); + GAPI_Assert(std::get<1>(result) && "Each appsink name must be unique!"); + } +} + +IStreamSource::Ptr GStreamerPipeline::Priv::getStreamingSource( + const std::string& appsinkName, const GStreamerSource::OutputType outputType) +{ + auto appsinkNameIt = m_appsinkNamesToUse.find(appsinkName); + if (appsinkNameIt == m_appsinkNamesToUse.end()) + { + cv::util::throw_error(std::logic_error(std::string("There is no appsink element in the " + "pipeline with the name '") + appsinkName + "'.")); + } + + if (!appsinkNameIt->second) + { + cv::util::throw_error(std::logic_error(std::string("appsink element with the name '") + + appsinkName + "' has been already used to create a GStreamerSource!")); + } + + m_appsinkNamesToUse[appsinkName] = false /* not free */; + + IStreamSource::Ptr src; + try { + src = cv::gapi::wip::make_src(m_pipeline, appsinkName, + outputType); + } + catch(...) { + m_appsinkNamesToUse[appsinkName] = true; /* free */ + cv::util::throw_error(std::runtime_error(std::string("Error during creation of ") + + "GStreamerSource on top of '" + appsinkName + "' appsink element!")); + } + + return src; +} + +GStreamerPipeline::Priv::~Priv() { } + +#else // HAVE_GSTREAMER + +GStreamerPipeline::Priv::Priv(const std::string&) +{ + GAPI_Assert(false && "Built without GStreamer support!"); +} + +IStreamSource::Ptr GStreamerPipeline::Priv::getStreamingSource(const std::string&, + const GStreamerSource::OutputType) +{ + // No need an assert here. The assert raise C4702 warning. Constructor have already got assert. + return nullptr; +} + +GStreamerPipeline::Priv::~Priv() +{ + // No need an assert here. The assert raise C4722 warning. Constructor have already got assert. +} + +#endif // HAVE_GSTREAMER + +GStreamerPipeline::GStreamerPipeline(const std::string& pipelineDesc): + m_priv(new Priv(pipelineDesc)) { } + +IStreamSource::Ptr GStreamerPipeline::getStreamingSource( + const std::string& appsinkName, const GStreamerSource::OutputType outputType) +{ + return m_priv->getStreamingSource(appsinkName, outputType); +} + +GStreamerPipeline::~GStreamerPipeline() +{ } + +GStreamerPipeline::GStreamerPipeline(std::unique_ptr priv): + m_priv(std::move(priv)) +{ } + +} // namespace gst +} // namespace wip +} // namespace gapi +} // namespace cv diff --git a/modules/gapi/src/streaming/gstreamer/gstreamerpipeline_priv.hpp b/modules/gapi/src/streaming/gstreamer/gstreamerpipeline_priv.hpp new file mode 100644 index 0000000000..4b10d1011c --- /dev/null +++ b/modules/gapi/src/streaming/gstreamer/gstreamerpipeline_priv.hpp @@ -0,0 +1,58 @@ +// This file is part of OpenCV project. +// It is subject to the license terms in the LICENSE file found in the top-level directory +// of this distribution and at http://opencv.org/license.html. +// +// Copyright (C) 2021 Intel Corporation + +#ifndef OPENCV_GAPI_STREAMING_GSTREAMER_GSTREAMERPIPELINE_PRIV_HPP +#define OPENCV_GAPI_STREAMING_GSTREAMER_GSTREAMERPIPELINE_PRIV_HPP + +#include + +#include +#include + +namespace cv { +namespace gapi { +namespace wip { +namespace gst { + +#ifdef HAVE_GSTREAMER + +class GStreamerPipeline::Priv +{ +public: + explicit Priv(const std::string& pipeline); + + IStreamSource::Ptr getStreamingSource(const std::string& appsinkName, + const GStreamerSource::OutputType outputType); + + virtual ~Priv(); + +protected: + std::shared_ptr m_pipeline; + std::unordered_map m_appsinkNamesToUse; +}; + +#else // HAVE_GSTREAMER + +class GStreamerPipeline::Priv +{ +public: + explicit Priv(const std::string& pipeline); + + IStreamSource::Ptr getStreamingSource(const std::string& appsinkName, + const GStreamerSource::OutputType outputType); + + virtual ~Priv(); +}; + +#endif // HAVE_GSTREAMER + +} // namespace gst +} // namespace wip +} // namespace gapi +} // namespace cv + + +#endif // OPENCV_GAPI_STREAMING_GSTREAMER_GSTREAMERPIPELINE_PRIV_HPP diff --git a/modules/gapi/src/streaming/gstreamer/gstreamerptr.hpp b/modules/gapi/src/streaming/gstreamer/gstreamerptr.hpp new file mode 100644 index 0000000000..6e6bba37e3 --- /dev/null +++ b/modules/gapi/src/streaming/gstreamer/gstreamerptr.hpp @@ -0,0 +1,177 @@ +// This file is part of OpenCV project. +// It is subject to the license terms in the LICENSE file found in the top-level directory +// of this distribution and at http://opencv.org/license.html. +// +// Copyright (C) 2021 Intel Corporation + +#ifndef OPENCV_GAPI_STREAMING_GSTREAMER_GSTREAMERPTR_HPP +#define OPENCV_GAPI_STREAMING_GSTREAMER_GSTREAMERPTR_HPP + +#include + +#include + +#ifdef HAVE_GSTREAMER +#include +#include + +namespace cv { +namespace gapi { +namespace wip { +namespace gst { + +template static inline void GStreamerPtrUnrefObject(T* ptr) +{ + if (ptr) + { + gst_object_unref(G_OBJECT(ptr)); + } +} + +template static inline void GStreamerPtrRelease(T* ptr); + +template<> inline void GStreamerPtrRelease(GError* ptr) +{ + if (ptr) + { + g_error_free(ptr); + } +} + +template<> inline void GStreamerPtrRelease(GstElement* ptr) +{ + GStreamerPtrUnrefObject(ptr); +} + +template<> inline void GStreamerPtrRelease(GstElementFactory* ptr) +{ + GStreamerPtrUnrefObject(ptr); +} + +template<> inline void GStreamerPtrRelease(GstPad* ptr) +{ + GStreamerPtrUnrefObject(ptr); +} + +template<> inline void GStreamerPtrRelease(GstBus* ptr) +{ + GStreamerPtrUnrefObject(ptr); +} + +template<> inline void GStreamerPtrRelease(GstAllocator* ptr) +{ + GStreamerPtrUnrefObject(ptr); +} + +template<> inline void GStreamerPtrRelease(GstVideoInfo* ptr) +{ + if (ptr) + { + gst_video_info_free(ptr); + } +} + +template<> inline void GStreamerPtrRelease(GstCaps* ptr) +{ + if (ptr) + { + gst_caps_unref(ptr); + } +} + +template<> inline void GStreamerPtrRelease(GstMemory* ptr) +{ + if (ptr) + { + gst_memory_unref(ptr); + } +} + +template<> inline void GStreamerPtrRelease(GstBuffer* ptr) +{ + if (ptr) + { + gst_buffer_unref(ptr); + } +} + +template<> inline void GStreamerPtrRelease(GstSample* ptr) +{ + if (ptr) + { + gst_sample_unref(ptr); + } +} + +template<> inline void GStreamerPtrRelease(GstMessage* ptr) +{ + if (ptr) + { + gst_message_unref(ptr); + } +} + +template<> inline void GStreamerPtrRelease(GstIterator* ptr) +{ + if (ptr) + { + gst_iterator_free(ptr); + } +} + +template<> inline void GStreamerPtrRelease(GstQuery* ptr) +{ + if (ptr) + { + gst_query_unref(ptr); + } +} + +template<> inline void GStreamerPtrRelease(char* ptr) +{ + if (ptr) + { + g_free(ptr); + } +} + +// NOTE: The main concept of this class is to be owner of some passed to it piece of memory. +// (be owner = free this memory or reduce reference count to it after use). +// More specifically, GStreamerPtr is designed to own memory returned from GStreamer/GLib +// functions, which are marked as [transfer full] in documentation. +// [transfer full] means that function fully transfers ownership of returned memory to the +// receiving piece of code. +// +// Memory ownership and ownership transfer concept: +// https://developer.gnome.org/programming-guidelines/stable/memory-management.html.en#g-clear-object + +// NOTE: GStreamerPtr can only own strong references, not floating ones. +// For floating references please call g_object_ref_sink(reference) before wrapping +// it with GStreamerPtr. +// See https://developer.gnome.org/gobject/stable/gobject-The-Base-Object-Type.html#floating-ref +// for floating references. +// NOTE: GStreamerPtr doesn't support pointers to arrays, only pointers to single objects. +template class GStreamerPtr : + public std::unique_ptr)> +{ + using BaseClass = std::unique_ptr)>; + +public: + constexpr GStreamerPtr() noexcept : BaseClass(nullptr, GStreamerPtrRelease) { } + constexpr GStreamerPtr(std::nullptr_t) noexcept : BaseClass(nullptr, GStreamerPtrRelease) { } + explicit GStreamerPtr(typename BaseClass::pointer p) noexcept : + BaseClass(p, GStreamerPtrRelease) { } + + GStreamerPtr& operator=(T* p) noexcept { *this = std::move(GStreamerPtr(p)); return *this; } + + inline operator T*() noexcept { return this->get(); } + // There is no const correctness in GStreamer C API + inline operator /*const*/ T*() const noexcept { return (T*)this->get(); } +}; + +} // namespace gst +} // namespace wip +} // namespace gapi +} // namespace cv +#endif // HAVE_GSTREAMER +#endif // OPENCV_GAPI_STREAMING_GSTREAMER_GSTREAMERPTR_HPP diff --git a/modules/gapi/src/streaming/gstreamer/gstreamersource.cpp b/modules/gapi/src/streaming/gstreamer/gstreamersource.cpp new file mode 100644 index 0000000000..661125657c --- /dev/null +++ b/modules/gapi/src/streaming/gstreamer/gstreamersource.cpp @@ -0,0 +1,383 @@ +// This file is part of OpenCV project. +// It is subject to the license terms in the LICENSE file found in the top-level directory +// of this distribution and at http://opencv.org/license.html. +// +// Copyright (C) 2021 Intel Corporation + +#include "gstreamer_buffer_utils.hpp" + +#include "gstreamer_media_adapter.hpp" + +#include "gstreamersource_priv.hpp" +#include + +#include + +#include + +#include + +#ifdef HAVE_GSTREAMER +#include +#include +#include +#endif // HAVE_GSTREAMER + +namespace cv { +namespace gapi { +namespace wip { +namespace gst { + +#ifdef HAVE_GSTREAMER + +constexpr char NV12_CAPS_STRING[] = + "video/x-raw,format=NV12;video/x-raw(memory:DMABuf),format=NV12"; + +namespace { +GstPadProbeReturn appsinkQueryCallback(GstPad*, GstPadProbeInfo* info, gpointer) +{ + GstQuery *query = GST_PAD_PROBE_INFO_QUERY(info); + + if (GST_QUERY_TYPE(query) != GST_QUERY_ALLOCATION) + return GST_PAD_PROBE_OK; + + gst_query_add_allocation_meta(query, GST_VIDEO_META_API_TYPE, NULL); + + return GST_PAD_PROBE_HANDLED; +} +} // anonymous namespace + +GStreamerSource::Priv::Priv(const std::string& pipelineDesc, + const GStreamerSource::OutputType outputType) : + m_pipeline(std::make_shared(pipelineDesc)), + m_outputType(outputType) +{ + GAPI_Assert((m_outputType == GStreamerSource::OutputType::FRAME || + m_outputType == GStreamerSource::OutputType::MAT) + && "Unsupported output type for GStreamerSource!"); + + auto appsinks = m_pipeline->getElementsByFactoryName("appsink"); + GAPI_Assert(1ul == appsinks.size() && + "GStreamerSource can accept pipeline with only 1 appsink element inside!\n" + "Please, note, that amount of sink elements of other than appsink type is not limited.\n"); + + m_appsink = GST_ELEMENT(gst_object_ref(appsinks[0])); + + configureAppsink(); +} + +GStreamerSource::Priv::Priv(std::shared_ptr pipeline, + const std::string& appsinkName, + const GStreamerSource::OutputType outputType) : + m_pipeline(pipeline), + m_outputType(outputType) +{ + GAPI_Assert((m_outputType == GStreamerSource::OutputType::FRAME || + m_outputType == GStreamerSource::OutputType::MAT) + && "Unsupported output type for GStreamerSource!"); + + m_appsink = (GST_ELEMENT(gst_object_ref(m_pipeline->getElementByName(appsinkName)))); + configureAppsink(); +} + +bool GStreamerSource::Priv::pull(cv::gapi::wip::Data& data) +{ + bool result = false; + switch(m_outputType) { + case GStreamerSource::OutputType::FRAME: { + cv::MediaFrame frame; + result = retrieveFrame(frame); + if (result) { + data = frame; + } + break; + } + case GStreamerSource::OutputType::MAT: { + cv::Mat mat; + result = retrieveFrame(mat); + if (result) { + data = mat; + } + break; + } + } + + if (result) { + data.meta[cv::gapi::streaming::meta_tag::timestamp] = computeTimestamp(); + data.meta[cv::gapi::streaming::meta_tag::seq_id] = m_frameId++; + } + + return result; +} + +GMetaArg GStreamerSource::Priv::descr_of() noexcept +{ + // Prepare frame metadata if it wasn't prepared yet. + prepareVideoMeta(); + + switch(m_outputType) { + case GStreamerSource::OutputType::FRAME: { + return GMetaArg { m_mediaFrameMeta }; + } + case GStreamerSource::OutputType::MAT: { + return GMetaArg { m_matMeta }; + } + } + + return GMetaArg { }; +} + +void GStreamerSource::Priv::configureAppsink() { + // NOTE: appsink element should be configured before pipeline launch. + GAPI_Assert(!m_pipeline->isPlaying()); + + // TODO: is 1 single buffer really high enough? + gst_app_sink_set_max_buffers(GST_APP_SINK(m_appsink.get()), 1); + + // Do not emit signals: all calls will be synchronous and blocking. + gst_app_sink_set_emit_signals(GST_APP_SINK(m_appsink.get()), FALSE); + + GStreamerPtr nv12Caps(gst_caps_from_string(NV12_CAPS_STRING)); + + GStreamerPtr appsinkPad(gst_element_get_static_pad(m_appsink, "sink")); + GStreamerPtr peerCaps(gst_pad_peer_query_caps(appsinkPad, NULL)); + if (!gst_caps_can_intersect(peerCaps, nv12Caps)) { + cv::util::throw_error( + std::logic_error("appsink element can only consume video-frame in NV12 format in " + "GStreamerSource")); + } + + gst_app_sink_set_caps(GST_APP_SINK(m_appsink.get()), nv12Caps); + + gst_pad_add_probe(appsinkPad, GST_PAD_PROBE_TYPE_QUERY_DOWNSTREAM, appsinkQueryCallback, + NULL, NULL); +} + +void GStreamerSource::Priv::prepareVideoMeta() +{ + if (!m_isMetaPrepared) { + m_pipeline->completePreroll(); + + GStreamerPtr prerollSample( +#if GST_VERSION_MINOR >= 10 + gst_app_sink_try_pull_preroll(GST_APP_SINK(m_appsink.get()), 5 * GST_SECOND)); +#else // GST_VERSION_MINOR < 10 + // TODO: This function may cause hang with some pipelines, need to check whether these + // pipelines are really wrong or not? + gst_app_sink_pull_preroll(GST_APP_SINK(m_appsink.get()))); +#endif // GST_VERSION_MINOR >= 10 + GAPI_Assert(prerollSample != nullptr); + + GstCaps* prerollCaps(gst_sample_get_caps(prerollSample)); + GAPI_Assert(prerollCaps != nullptr); + + const GstStructure* structure = gst_caps_get_structure(prerollCaps, 0); + + // Width and height held in GstCaps structure are format-independent width and height + // of the frame. So the actual height of the output buffer in NV12 format will be + // height * 3/2. + int width = 0; + int height = 0; + if (!gst_structure_get_int(structure, "width", &width) || + !gst_structure_get_int(structure, "height", &height)) + { + cv::util::throw_error(std::logic_error("Cannot query video width/height.")); + } + + switch(m_outputType) { + case GStreamerSource::OutputType::FRAME: { + // Construct metadata for media frame. + m_mediaFrameMeta = GFrameDesc { cv::MediaFormat::NV12, cv::Size(width, height) }; + break; + } + case GStreamerSource::OutputType::MAT: { + // Construct metadata for BGR mat. + m_matMeta = GMatDesc { CV_8U, 3, cv::Size(width, height), false }; + break; + } + } + + // Fill GstVideoInfo structure to work further with GstVideoFrame class. + if (!gst_video_info_from_caps(&m_videoInfo, prerollCaps)) { + cv::util::throw_error(std::logic_error("preroll sample has invalid caps.")); + } + GAPI_Assert(GST_VIDEO_INFO_N_PLANES(&m_videoInfo) == 2); + GAPI_Assert(GST_VIDEO_INFO_FORMAT(&m_videoInfo) == GST_VIDEO_FORMAT_NV12); + + m_isMetaPrepared = true; + } +} + +int64_t GStreamerSource::Priv::computeTimestamp() +{ + GAPI_Assert(m_buffer && "Pulled buffer is nullptr!"); + + int64_t timestamp { }; + + GstClockTime pts = GST_BUFFER_PTS(m_buffer); + if (GST_CLOCK_TIME_IS_VALID(pts)) { + timestamp = GST_TIME_AS_USECONDS(pts); + } else { + const auto now = std::chrono::system_clock::now(); + const auto dur = std::chrono::duration_cast + (now.time_since_epoch()); + timestamp = int64_t{dur.count()}; + } + + return timestamp; +} + +bool GStreamerSource::Priv::pullBuffer() +{ + // Start the pipeline if it is not in the playing state yet + if (!m_isPipelinePlaying) { + m_pipeline->play(); + m_isPipelinePlaying = true; + } + + // Bail out if EOS + if (gst_app_sink_is_eos(GST_APP_SINK(m_appsink.get()))) + { + return false; + } + + m_sample = gst_app_sink_pull_sample(GST_APP_SINK(m_appsink.get())); + + // TODO: GAPI_Assert because of already existed check for EOS? + if (m_sample == nullptr) + { + return false; + } + + m_buffer = gst_sample_get_buffer(m_sample); + GAPI_Assert(m_buffer && "Grabbed sample has no buffer!"); + + return true; +} + +bool GStreamerSource::Priv::retrieveFrame(cv::Mat& data) +{ + // Prepare metadata if it isn't prepared yet. + prepareVideoMeta(); + + bool result = pullBuffer(); + if (!result) + { + return false; + } + + // TODO: Use RAII for map/unmap + GstVideoFrame videoFrame; + gstreamer_utils::mapBufferToFrame(*m_buffer, m_videoInfo, videoFrame, GST_MAP_READ); + + try + { + // m_matMeta holds width and height for 8U BGR frame, but actual + // frame m_buffer we request from GStreamer pipeline has 8U NV12 format. + // Constructing y and uv cv::Mat-s from such a m_buffer: + GAPI_Assert((uint8_t*)GST_VIDEO_FRAME_PLANE_DATA(&videoFrame, 1) == + (uint8_t*)GST_VIDEO_FRAME_PLANE_DATA(&videoFrame, 0) + + GST_VIDEO_FRAME_PLANE_OFFSET(&videoFrame, 1)); + + cv::Mat y(m_matMeta.size, CV_8UC1, + (uint8_t*)GST_VIDEO_FRAME_PLANE_DATA(&videoFrame, 0) + + GST_VIDEO_FRAME_PLANE_OFFSET(&videoFrame, 0), + GST_VIDEO_FRAME_PLANE_STRIDE(&videoFrame, 0)); + cv::Mat uv(m_matMeta.size / 2, CV_8UC2, + (uint8_t*)GST_VIDEO_FRAME_PLANE_DATA(&videoFrame, 0) + + GST_VIDEO_FRAME_PLANE_OFFSET(&videoFrame, 1), + GST_VIDEO_FRAME_PLANE_STRIDE(&videoFrame, 1)); + + cv::cvtColorTwoPlane(y, uv, data, cv::COLOR_YUV2BGR_NV12); + } + catch (...) + { + gst_video_frame_unmap(&videoFrame); + cv::util::throw_error(std::runtime_error("NV12 buffer conversion to BGR is failed!")); + } + gst_video_frame_unmap(&videoFrame); + + return true; +} + +bool GStreamerSource::Priv::retrieveFrame(cv::MediaFrame& data) +{ + // Prepare metadata if it isn't prepared yet. + prepareVideoMeta(); + + bool result = pullBuffer(); + if (!result) + { + return false; + } + + data = cv::MediaFrame::Create(m_mediaFrameMeta, &m_videoInfo, + m_buffer); + + return true; +} + +GStreamerSource::Priv::~Priv() { } + +#else // HAVE_GSTREAMER + +GStreamerSource::Priv::Priv(const std::string&, const GStreamerSource::OutputType) +{ + GAPI_Assert(false && "Built without GStreamer support!"); +} + +GStreamerSource::Priv::Priv(std::shared_ptr, const std::string&, + const GStreamerSource::OutputType) +{ + GAPI_Assert(false && "Built without GStreamer support!"); +} + +bool GStreamerSource::Priv::pull(cv::gapi::wip::Data&) +{ + // No need an assert here. Constructor have already got assert. + return false; +} + +GMetaArg GStreamerSource::Priv::descr_of() const noexcept +{ + // No need an assert here. The assert raise C4702 warning. Constructor have already got assert. + return GMetaArg{}; +} + +GStreamerSource::Priv::~Priv() +{ + // No need an assert here. The assert raise C4722 warning. Constructor have already got assert. +} + +#endif // HAVE_GSTREAMER + +GStreamerSource::GStreamerSource(const std::string& pipeline, + const GStreamerSource::OutputType outputType): + m_priv(new Priv(pipeline, outputType)) { } + +GStreamerSource::GStreamerSource(std::shared_ptr pipeline, + const std::string& appsinkName, + const GStreamerSource::OutputType outputType): + m_priv(new Priv(pipeline, appsinkName, outputType)) { } + +bool GStreamerSource::pull(cv::gapi::wip::Data& data) +{ + return m_priv->pull(data); +} + +GMetaArg GStreamerSource::descr_of() const +{ + return m_priv->descr_of(); +} + +GStreamerSource::~GStreamerSource() +{ } + +GStreamerSource::GStreamerSource(std::unique_ptr priv): + m_priv(std::move(priv)) +{ } + +} // namespace gst +} // namespace wip +} // namespace gapi +} // namespace cv diff --git a/modules/gapi/src/streaming/gstreamer/gstreamersource_priv.hpp b/modules/gapi/src/streaming/gstreamer/gstreamersource_priv.hpp new file mode 100644 index 0000000000..b0940c48a3 --- /dev/null +++ b/modules/gapi/src/streaming/gstreamer/gstreamersource_priv.hpp @@ -0,0 +1,94 @@ +// This file is part of OpenCV project. +// It is subject to the license terms in the LICENSE file found in the top-level directory +// of this distribution and at http://opencv.org/license.html. +// +// Copyright (C) 2021 Intel Corporation + +#ifndef OPENCV_GAPI_STREAMING_GSTREAMER_GSTREAMERSOURCE_PRIV_HPP +#define OPENCV_GAPI_STREAMING_GSTREAMER_GSTREAMERSOURCE_PRIV_HPP + +#include "gstreamerptr.hpp" +#include "gstreamer_pipeline_facade.hpp" +#include + +#include + +#ifdef HAVE_GSTREAMER +#include +#include +#endif // HAVE_GSTREAMER + +namespace cv { +namespace gapi { +namespace wip { +namespace gst { + +#ifdef HAVE_GSTREAMER + +class GStreamerSource::Priv +{ +public: + Priv(const std::string& pipeline, const GStreamerSource::OutputType outputType); + + Priv(std::shared_ptr pipeline, const std::string& appsinkName, + const GStreamerSource::OutputType outputType); + + bool pull(cv::gapi::wip::Data& data); + + // non-const in difference with GStreamerSource, because contains delayed meta initialization + GMetaArg descr_of() noexcept; + + virtual ~Priv(); + +protected: + // Shares: + std::shared_ptr m_pipeline; + + // Owns: + GStreamerPtr m_appsink; + GStreamerPtr m_sample; + GstBuffer* m_buffer = nullptr; // Actual frame memory holder + GstVideoInfo m_videoInfo; // Information about Video frame + + GStreamerSource::OutputType m_outputType = GStreamerSource::OutputType::MAT; + + GMatDesc m_matMeta; + GFrameDesc m_mediaFrameMeta; + + bool m_isMetaPrepared = false; + bool m_isPipelinePlaying = false; + + int64_t m_frameId = 0L; + +protected: + void configureAppsink(); + void prepareVideoMeta(); + + int64_t computeTimestamp(); + + bool pullBuffer(); + bool retrieveFrame(cv::Mat& data); + bool retrieveFrame(cv::MediaFrame& data); +}; + +#else // HAVE_GSTREAMER + +class GStreamerSource::Priv +{ +public: + Priv(const std::string& pipeline, const GStreamerSource::OutputType outputType); + Priv(std::shared_ptr pipeline, const std::string& appsinkName, + const GStreamerSource::OutputType outputType); + bool pull(cv::gapi::wip::Data& data); + GMetaArg descr_of() const noexcept; + virtual ~Priv(); +}; + +#endif // HAVE_GSTREAMER + +} // namespace gst +} // namespace wip +} // namespace gapi +} // namespace cv + +#endif // OPENCV_GAPI_STREAMING_GSTREAMER_GSTREAMERSOURCE_PRIV_HPP diff --git a/modules/gapi/test/streaming/gapi_gstreamer_pipeline_facade_int_tests.cpp b/modules/gapi/test/streaming/gapi_gstreamer_pipeline_facade_int_tests.cpp new file mode 100644 index 0000000000..7b809a8847 --- /dev/null +++ b/modules/gapi/test/streaming/gapi_gstreamer_pipeline_facade_int_tests.cpp @@ -0,0 +1,188 @@ +// This file is part of OpenCV project. +// It is subject to the license terms in the LICENSE file found in the top-level directory +// of this distribution and at http://opencv.org/license.html. +// +// Copyright (C) 2021 Intel Corporation + +#include "../test/common/gapi_tests_common.hpp" + +#include "../src/streaming/gstreamer/gstreamer_pipeline_facade.hpp" +#include "../src/streaming/gstreamer/gstreamerptr.hpp" + +#include + +#include + +#ifdef HAVE_GSTREAMER +#include + +namespace opencv_test +{ + +TEST(GStreamerPipelineFacadeTest, GetElsByFactoryNameUnitTest) +{ + auto comparator = [](GstElement* element, const std::string& factoryName) { + cv::gapi::wip::gst::GStreamerPtr name( + gst_object_get_name(GST_OBJECT(gst_element_get_factory(element)))); + return name && (0 == strcmp(name, factoryName.c_str())); + }; + + cv::gapi::wip::gst::GStreamerPipelineFacade + pipelineFacade("videotestsrc is-live=true pattern=colors num-buffers=10 ! " + "videorate ! videoscale ! " + "video/x-raw,width=1920,height=1080,framerate=3/1 ! " + "appsink name=sink1 " + "videotestsrc is-live=true pattern=colors num-buffers=10 ! " + "videorate ! videoscale ! " + "video/x-raw,width=1920,height=1080,framerate=3/1 ! " + "appsink name=sink2"); + + auto videotestsrcs = pipelineFacade.getElementsByFactoryName("videotestsrc"); + EXPECT_EQ(2u, videotestsrcs.size()); + for (auto&& src : videotestsrcs) { + EXPECT_TRUE(comparator(src, "videotestsrc")); + } + + auto appsinks = pipelineFacade.getElementsByFactoryName("appsink"); + EXPECT_EQ(2u, appsinks.size()); + for (auto&& sink : appsinks) { + EXPECT_TRUE(comparator(sink, "appsink")); + } +} + +TEST(GStreamerPipelineFacadeTest, GetElByNameUnitTest) +{ + auto comparator = [](GstElement* element, const std::string& elementName) { + cv::gapi::wip::gst::GStreamerPtr name(gst_element_get_name(element)); + return name && (0 == strcmp(name, elementName.c_str())); + }; + + cv::gapi::wip::gst::GStreamerPipelineFacade + pipelineFacade("videotestsrc is-live=true pattern=colors num-buffers=10 ! " + "videorate ! videoscale ! " + "video/x-raw,width=1920,height=1080,framerate=3/1 ! " + "appsink name=sink1 " + "videotestsrc is-live=true pattern=colors num-buffers=10 ! " + "videorate ! videoscale ! " + "video/x-raw,width=1920,height=1080,framerate=3/1 ! " + "appsink name=sink2"); + + auto appsink1 = pipelineFacade.getElementByName("sink1"); + GAPI_Assert(appsink1 != nullptr); + EXPECT_TRUE(comparator(appsink1, "sink1")); + auto appsink2 = pipelineFacade.getElementByName("sink2"); + GAPI_Assert(appsink2 != nullptr); + EXPECT_TRUE(comparator(appsink2, "sink2")); +} + +TEST(GStreamerPipelineFacadeTest, CompletePrerollUnitTest) +{ + cv::gapi::wip::gst::GStreamerPipelineFacade + pipelineFacade("videotestsrc is-live=true pattern=colors num-buffers=10 ! " + "videorate ! videoscale ! " + "video/x-raw,width=1920,height=1080,framerate=3/1 ! " + "appsink name=sink1 " + "videotestsrc is-live=true pattern=colors num-buffers=10 ! " + "videorate ! videoscale ! " + "video/x-raw,width=1920,height=1080,framerate=3/1 ! " + "appsink name=sink2"); + + auto appsink = pipelineFacade.getElementByName("sink1"); + pipelineFacade.completePreroll(); + + cv::gapi::wip::gst::GStreamerPtr prerollSample( +#if GST_VERSION_MINOR >= 10 + gst_app_sink_try_pull_preroll(GST_APP_SINK(appsink), 5 * GST_SECOND) +#else // GST_VERSION_MINOR < 10 + // TODO: This function may cause hang with some pipelines, need to check whether these + // pipelines are really wrong or not? + gst_app_sink_pull_preroll(GST_APP_SINK(appsink)) +#endif // GST_VERSION_MINOR >= 10 + ); + GAPI_Assert(prerollSample != nullptr); +} + +TEST(GStreamerPipelineFacadeTest, PlayUnitTest) +{ + cv::gapi::wip::gst::GStreamerPipelineFacade + pipelineFacade("videotestsrc is-live=true pattern=colors num-buffers=10 ! " + "videorate ! videoscale ! " + "video/x-raw,width=1920,height=1080,framerate=3/1 ! " + "appsink name=sink1 " + "videotestsrc is-live=true pattern=colors num-buffers=10 ! " + "videorate ! videoscale ! " + "video/x-raw,width=1920,height=1080,framerate=3/1 ! " + "appsink name=sink2"); + + auto appsink = pipelineFacade.getElementByName("sink2"); + + pipelineFacade.play(); + + cv::gapi::wip::gst::PipelineState state; + GstStateChangeReturn status = + gst_element_get_state(appsink, &state.current, &state.pending, 5 * GST_SECOND); + EXPECT_EQ(GST_STATE_CHANGE_SUCCESS, status); + EXPECT_EQ(GST_STATE_PLAYING, state.current); + EXPECT_EQ(GST_STATE_VOID_PENDING, state.pending); +} + +TEST(GStreamerPipelineFacadeTest, IsPlayingUnitTest) +{ + cv::gapi::wip::gst::GStreamerPipelineFacade + pipelineFacade("videotestsrc is-live=true pattern=colors num-buffers=10 ! " + "videorate ! videoscale ! " + "video/x-raw,width=1920,height=1080,framerate=3/1 ! " + "appsink name=sink1 " + "videotestsrc is-live=true pattern=colors num-buffers=10 ! " + "videorate ! videoscale ! " + "video/x-raw,width=1920,height=1080,framerate=3/1 ! " + "appsink name=sink2"); + + EXPECT_EQ(false, pipelineFacade.isPlaying()); + pipelineFacade.play(); + EXPECT_EQ(true, pipelineFacade.isPlaying()); +} + +TEST(GStreamerPipelineFacadeTest, MTSafetyUnitTest) +{ + cv::gapi::wip::gst::GStreamerPipelineFacade + pipelineFacade("videotestsrc is-live=true pattern=colors num-buffers=10 ! " + "videorate ! videoscale ! " + "video/x-raw,width=1920,height=1080,framerate=3/1 ! " + "appsink name=sink1 " + "videotestsrc is-live=true pattern=colors num-buffers=10 ! " + "videorate ! videoscale ! " + "video/x-raw,width=1920,height=1080,framerate=3/1 ! " + "appsink name=sink2"); + + auto prerollRoutine = [&pipelineFacade](){ pipelineFacade.completePreroll(); }; + auto playRoutine = [&pipelineFacade](){ pipelineFacade.play(); }; + auto isPlayingRoutine = [&pipelineFacade](){ pipelineFacade.isPlaying(); }; + + using f = std::function; + + auto routinesLauncher = [](const f& r1, const f& r2, const f& r3) { + std::vector routines { r1, r2, r3 }; + std::vector threads { }; + + for (auto&& r : routines) { + threads.emplace_back(r); + } + + for (auto&& t : threads) { + t.join(); + } + }; + + routinesLauncher(prerollRoutine, playRoutine, isPlayingRoutine); + routinesLauncher(prerollRoutine, isPlayingRoutine, playRoutine); + routinesLauncher(isPlayingRoutine, prerollRoutine, playRoutine); + routinesLauncher(playRoutine, prerollRoutine, isPlayingRoutine); + routinesLauncher(playRoutine, isPlayingRoutine, prerollRoutine); + routinesLauncher(isPlayingRoutine, playRoutine, prerollRoutine); + + EXPECT_TRUE(true); +} +} // namespace opencv_test + +#endif // HAVE_GSTREAMER diff --git a/modules/gapi/test/streaming/gapi_gstreamersource_tests.cpp b/modules/gapi/test/streaming/gapi_gstreamersource_tests.cpp new file mode 100644 index 0000000000..0478d2dc1d --- /dev/null +++ b/modules/gapi/test/streaming/gapi_gstreamersource_tests.cpp @@ -0,0 +1,401 @@ +// This file is part of OpenCV project. +// It is subject to the license terms in the LICENSE file found in the top-level directory +// of this distribution and at http://opencv.org/license.html. +// +// Copyright (C) 2021 Intel Corporation + +#include "../test/common/gapi_tests_common.hpp" + +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +#include + +#include + +#ifdef HAVE_GSTREAMER + +namespace opencv_test +{ + +struct GStreamerSourceTest : public TestWithParam> +{ }; + +TEST_P(GStreamerSourceTest, AccuracyTest) +{ + std::string pipeline; + cv::Size expectedFrameSize; + std::size_t streamLength { }; + std::tie(pipeline, expectedFrameSize, streamLength) = GetParam(); + + // Graph declaration: + cv::GMat in; + auto out = cv::gapi::copy(in); + cv::GComputation c(cv::GIn(in), cv::GOut(out)); + + // Graph compilation for streaming mode: + auto ccomp = c.compileStreaming(); + + EXPECT_TRUE(ccomp); + EXPECT_FALSE(ccomp.running()); + + // GStreamer streaming source configuration: + ccomp.setSource(pipeline); + + // Start of streaming: + ccomp.start(); + EXPECT_TRUE(ccomp.running()); + + // Streaming - pulling of frames until the end: + cv::Mat in_mat_gapi; + + EXPECT_TRUE(ccomp.pull(cv::gout(in_mat_gapi))); + EXPECT_TRUE(!in_mat_gapi.empty()); + EXPECT_EQ(expectedFrameSize, in_mat_gapi.size()); + EXPECT_EQ(CV_8UC3, in_mat_gapi.type()); + + std::size_t framesCount = 1UL; + while (ccomp.pull(cv::gout(in_mat_gapi))) { + EXPECT_TRUE(!in_mat_gapi.empty()); + EXPECT_EQ(expectedFrameSize, in_mat_gapi.size()); + EXPECT_EQ(CV_8UC3, in_mat_gapi.type()); + + framesCount++; + } + + EXPECT_FALSE(ccomp.running()); + ccomp.stop(); + + EXPECT_FALSE(ccomp.running()); + + EXPECT_EQ(streamLength, framesCount); +} + +TEST_P(GStreamerSourceTest, TimestampsTest) +{ + std::string pipeline; + std::size_t streamLength { }; + std::tie(pipeline, std::ignore, streamLength) = GetParam(); + + // Graph declaration: + cv::GMat in; + cv::GMat copied = cv::gapi::copy(in); + cv::GOpaque outId = cv::gapi::streaming::seq_id(copied); + cv::GOpaque outTs = cv::gapi::streaming::timestamp(copied); + cv::GComputation c(cv::GIn(in), cv::GOut(outId, outTs)); + + // Graph compilation for streaming mode: + auto ccomp = c.compileStreaming(); + + EXPECT_TRUE(ccomp); + EXPECT_FALSE(ccomp.running()); + + // GStreamer streaming source configuration: + ccomp.setSource(pipeline); + + // Start of streaming: + ccomp.start(); + EXPECT_TRUE(ccomp.running()); + + // Streaming - pulling of frames until the end: + int64_t seqId; + int64_t timestamp; + + std::vector allSeqIds; + std::vector allTimestamps; + + while (ccomp.pull(cv::gout(seqId, timestamp))) { + allSeqIds.push_back(seqId); + allTimestamps.push_back(timestamp); + } + + EXPECT_FALSE(ccomp.running()); + ccomp.stop(); + + EXPECT_FALSE(ccomp.running()); + + EXPECT_EQ(0L, allSeqIds.front()); + EXPECT_EQ(int64_t(streamLength) - 1, allSeqIds.back()); + EXPECT_EQ(streamLength, allSeqIds.size()); + EXPECT_TRUE(std::is_sorted(allSeqIds.begin(), allSeqIds.end())); + EXPECT_EQ(allSeqIds.size(), std::set(allSeqIds.begin(), allSeqIds.end()).size()); + + EXPECT_EQ(streamLength, allTimestamps.size()); + EXPECT_TRUE(std::is_sorted(allTimestamps.begin(), allTimestamps.end())); +} + +G_TYPED_KERNEL(GGstFrameCopyToNV12, (GFrame)>, + "org.opencv.test.gstframe_copy_to_nv12") +{ + static std::tuple outMeta(GFrameDesc desc) { + GMatDesc y { CV_8U, 1, desc.size, false }; + GMatDesc uv { CV_8U, 2, desc.size / 2, false }; + + return std::make_tuple(y, uv); + } +}; + +GAPI_OCV_KERNEL(GOCVGstFrameCopyToNV12, GGstFrameCopyToNV12) +{ + static void run(const cv::MediaFrame& in, cv::Mat& y, cv::Mat& uv) + { + auto view = in.access(cv::MediaFrame::Access::R); + cv::Mat ly(y.size(), y.type(), view.ptr[0], view.stride[0]); + cv::Mat luv(uv.size(), uv.type(), view.ptr[1], view.stride[1]); + + ly.copyTo(y); + luv.copyTo(uv); + } +}; + +TEST_P(GStreamerSourceTest, GFrameTest) +{ + std::string pipeline; + cv::Size expectedFrameSize; + std::size_t streamLength { }; + std::tie(pipeline, expectedFrameSize, streamLength) = GetParam(); + + // Graph declaration: + cv::GFrame in; + cv::GMat copiedY, copiedUV; + std::tie(copiedY, copiedUV) = GGstFrameCopyToNV12::on(in); + cv::GComputation c(cv::GIn(in), cv::GOut(copiedY, copiedUV)); + + // Graph compilation for streaming mode: + auto ccomp = c.compileStreaming(cv::compile_args(cv::gapi::kernels())); + + EXPECT_TRUE(ccomp); + EXPECT_FALSE(ccomp.running()); + + // GStreamer streaming source configuration: + ccomp.setSource + (pipeline, cv::gapi::wip::GStreamerSource::OutputType::FRAME); + + // Start of streaming: + ccomp.start(); + EXPECT_TRUE(ccomp.running()); + + // Streaming - pulling of frames until the end: + cv::Mat y_mat, uv_mat; + + EXPECT_TRUE(ccomp.pull(cv::gout(y_mat, uv_mat))); + EXPECT_TRUE(!y_mat.empty()); + EXPECT_TRUE(!uv_mat.empty()); + + cv::Size expectedYSize = expectedFrameSize; + cv::Size expectedUVSize = expectedFrameSize / 2; + + EXPECT_EQ(expectedYSize, y_mat.size()); + EXPECT_EQ(expectedUVSize, uv_mat.size()); + + EXPECT_EQ(CV_8UC1, y_mat.type()); + EXPECT_EQ(CV_8UC2, uv_mat.type()); + + std::size_t framesCount = 1UL; + while (ccomp.pull(cv::gout(y_mat, uv_mat))) { + EXPECT_TRUE(!y_mat.empty()); + EXPECT_TRUE(!uv_mat.empty()); + + EXPECT_EQ(expectedYSize, y_mat.size()); + EXPECT_EQ(expectedUVSize, uv_mat.size()); + + EXPECT_EQ(CV_8UC1, y_mat.type()); + EXPECT_EQ(CV_8UC2, uv_mat.type()); + + framesCount++; + } + + EXPECT_FALSE(ccomp.running()); + ccomp.stop(); + + EXPECT_FALSE(ccomp.running()); + + EXPECT_EQ(streamLength, framesCount); +} + +// FIXME: Need to launch with sudo. May be infrastructure problems. +// TODO: It is needed to add tests for streaming from native KMB camera: kmbcamsrc +// GStreamer element. +INSTANTIATE_TEST_CASE_P(CameraEmulatingPipeline, GStreamerSourceTest, + Combine(Values("videotestsrc is-live=true pattern=colors num-buffers=10 ! " + "videorate ! videoscale ! " + "video/x-raw,width=1920,height=1080,framerate=3/1 ! " + "appsink"), + Values(cv::Size(1920, 1080)), + Values(10UL))); + +INSTANTIATE_TEST_CASE_P(FileEmulatingPipeline, GStreamerSourceTest, + Combine(Values("videotestsrc pattern=colors num-buffers=10 ! " + "videorate ! videoscale ! " + "video/x-raw,width=640,height=420,framerate=3/1 ! " + "appsink"), + Values(cv::Size(640, 420)), + Values(10UL))); + +INSTANTIATE_TEST_CASE_P(MultipleLiveSources, GStreamerSourceTest, + Combine(Values("videotestsrc is-live=true pattern=colors num-buffers=10 ! " + "videoscale ! video/x-raw,width=1280,height=720 ! appsink " + "videotestsrc is-live=true pattern=colors num-buffers=10 ! " + "fakesink"), + Values(cv::Size(1280, 720)), + Values(10UL))); + +INSTANTIATE_TEST_CASE_P(MultipleNotLiveSources, GStreamerSourceTest, + Combine(Values("videotestsrc pattern=colors num-buffers=10 ! " + "videoscale ! video/x-raw,width=1280,height=720 ! appsink " + "videotestsrc pattern=colors num-buffers=10 ! " + "fakesink"), + Values(cv::Size(1280, 720)), + Values(10UL))); + + +TEST(GStreamerMultiSourceSmokeTest, Test) +{ + // Graph declaration: + cv::GMat in1, in2; + auto out = cv::gapi::add(in1, in2); + cv::GComputation c(cv::GIn(in1, in2), cv::GOut(out)); + + // Graph compilation for streaming mode: + auto ccomp = c.compileStreaming(); + + EXPECT_TRUE(ccomp); + EXPECT_FALSE(ccomp.running()); + + cv::gapi::wip::GStreamerPipeline + pipeline("videotestsrc is-live=true pattern=colors num-buffers=10 ! " + "videorate ! videoscale ! " + "video/x-raw,width=1920,height=1080,framerate=3/1 ! " + "appsink name=sink1 " + "videotestsrc is-live=true pattern=colors num-buffers=10 ! " + "videorate ! videoscale ! " + "video/x-raw,width=1920,height=1080,framerate=3/1 ! " + "appsink name=sink2"); + + // GStreamer streaming sources configuration: + auto src1 = pipeline.getStreamingSource("sink1"); + auto src2 = pipeline.getStreamingSource("sink2"); + + ccomp.setSource(cv::gin(src1, src2)); + + // Start of streaming: + ccomp.start(); + EXPECT_TRUE(ccomp.running()); + + // Streaming - pulling of frames until the end: + cv::Mat in_mat_gapi; + + EXPECT_TRUE(ccomp.pull(cv::gout(in_mat_gapi))); + EXPECT_TRUE(!in_mat_gapi.empty()); + EXPECT_EQ(CV_8UC3, in_mat_gapi.type()); + + while (ccomp.pull(cv::gout(in_mat_gapi))) { + EXPECT_TRUE(!in_mat_gapi.empty()); + EXPECT_EQ(CV_8UC3, in_mat_gapi.type()); + } + + EXPECT_FALSE(ccomp.running()); + ccomp.stop(); + + EXPECT_FALSE(ccomp.running()); +} + +struct GStreamerMultiSourceTest : + public TestWithParam> +{ }; + +TEST_P(GStreamerMultiSourceTest, ImageDataTest) +{ + std::string pathToLeftIm = findDataFile("cv/stereomatching/datasets/tsukuba/im6.png"); + std::string pathToRightIm = findDataFile("cv/stereomatching/datasets/tsukuba/im2.png"); + + std::string pipelineToReadImage("filesrc location=LOC ! pngdec ! videoconvert ! " + "videoscale ! video/x-raw,format=NV12 ! appsink"); + + cv::gapi::wip::GStreamerSource leftImageProvider( + std::regex_replace(pipelineToReadImage, std::regex("LOC"), pathToLeftIm)); + cv::gapi::wip::GStreamerSource rightImageProvider( + std::regex_replace(pipelineToReadImage, std::regex("LOC"), pathToRightIm)); + + cv::gapi::wip::Data leftImData, rightImData; + leftImageProvider.pull(leftImData); + rightImageProvider.pull(rightImData); + + cv::Mat leftRefMat = cv::util::get(leftImData); + cv::Mat rightRefMat = cv::util::get(rightImData); + + // Retrieve test parameters: + std::tuple params = GetParam(); + cv::GComputation extractImage = std::move(std::get<0>(params)); + cv::gapi::wip::GStreamerSource::OutputType outputType = std::get<1>(params); + + // Graph compilation for streaming mode: + auto compiled = + extractImage.compileStreaming(); + + EXPECT_TRUE(compiled); + EXPECT_FALSE(compiled.running()); + + cv::gapi::wip::GStreamerPipeline + pipeline(std::string("multifilesrc location=" + pathToLeftIm + " index=0 loop=true ! " + "pngdec ! videoconvert ! videoscale ! video/x-raw,format=NV12 ! " + "appsink name=sink1 ") + + std::string("multifilesrc location=" + pathToRightIm + " index=0 loop=true ! " + "pngdec ! videoconvert ! videoscale ! video/x-raw,format=NV12 ! " + "appsink name=sink2")); + + // GStreamer streaming sources configuration: + auto src1 = pipeline.getStreamingSource("sink1", outputType); + auto src2 = pipeline.getStreamingSource("sink2", outputType); + + compiled.setSource(cv::gin(src1, src2)); + + // Start of streaming: + compiled.start(); + EXPECT_TRUE(compiled.running()); + + // Streaming - pulling of frames: + cv::Mat in_mat1, in_mat2; + + std::size_t counter { }, limit { 10 }; + while(compiled.pull(cv::gout(in_mat1, in_mat2)) && (counter < limit)) { + EXPECT_EQ(0, cv::norm(in_mat1, leftRefMat, cv::NORM_INF)); + EXPECT_EQ(0, cv::norm(in_mat2, rightRefMat, cv::NORM_INF)); + ++counter; + } + + compiled.stop(); + + EXPECT_FALSE(compiled.running()); +} + +INSTANTIATE_TEST_CASE_P(GStreamerMultiSourceViaGMatsTest, GStreamerMultiSourceTest, + Combine(Values(cv::GComputation([]() + { + cv::GMat in1, in2; + return cv::GComputation(cv::GIn(in1, in2), + cv::GOut(cv::gapi::copy(in1), + cv::gapi::copy(in2))); + })), + Values(cv::gapi::wip::GStreamerSource::OutputType::MAT))); + +INSTANTIATE_TEST_CASE_P(GStreamerMultiSourceViaGFramesTest, GStreamerMultiSourceTest, + Combine(Values(cv::GComputation([]() + { + cv::GFrame in1, in2; + return cv::GComputation(cv::GIn(in1, in2), + cv::GOut(cv::gapi::streaming::BGR(in1), + cv::gapi::streaming::BGR(in2))); + })), + Values(cv::gapi::wip::GStreamerSource::OutputType::FRAME))); +} // namespace opencv_test + +#endif // HAVE_GSTREAMER