Merge pull request #21022 from sivanov-work:async_mfp_demux

G-API: oneVPL Implement asynchronous MFP demux data provider

* Add dummy dmux

* Initial commit for draft versionn

* Demux for low res file works

* Add media source resolver to work over incorrect MIME

* Add MFP Demux logger

* stash changes

* Extend IDataProvider with CodecId, Add troubleshooting info

* Add IDapaProvider dispatcher

* Add ComPtrGuard wrappers

* Add new unit test scope for MFP demux & Add minor changes

* Enhance UTs

* Remove ATL header

* Remove ATL another one

* Fix build

* Add static for some methods

* Initial commit

* Add async demuxing

* Apply tdd idea

* Intro IDataProvider changes: +fetch_bitstream, -fetch_data

* Fix UTs

* Remove IDataProvider::CodecId & Fix EOF hang

* Remove sync demux

* Remove mfp async dependencies

* Remove VPL dependencies from IDataProvider declaration

* Apply comments

* Fix compilation

* Suppress unused warning

* Apply some comments

* Apply some comments

* Apply comments
pull/21113/head
Sergey Ivanov 3 years ago committed by GitHub
parent ac4b592b4e
commit 02f08879a4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      modules/gapi/CMakeLists.txt
  2. 57
      modules/gapi/include/opencv2/gapi/streaming/onevpl/data_provider_interface.hpp
  3. 40
      modules/gapi/src/streaming/onevpl/cfg_param_device_selector.cpp
  4. 32
      modules/gapi/src/streaming/onevpl/data_provider_defines.hpp
  5. 68
      modules/gapi/src/streaming/onevpl/data_provider_dispatcher.cpp
  6. 29
      modules/gapi/src/streaming/onevpl/data_provider_dispatcher.hpp
  7. 37
      modules/gapi/src/streaming/onevpl/data_provider_interface_exception.cpp
  8. 820
      modules/gapi/src/streaming/onevpl/demux/async_mfp_demux_data_provider.cpp
  9. 126
      modules/gapi/src/streaming/onevpl/demux/async_mfp_demux_data_provider.hpp
  10. 17
      modules/gapi/src/streaming/onevpl/engine/decode/decode_engine_legacy.cpp
  11. 2
      modules/gapi/src/streaming/onevpl/engine/engine_session.cpp
  12. 7
      modules/gapi/src/streaming/onevpl/engine/engine_session.hpp
  13. 28
      modules/gapi/src/streaming/onevpl/engine/processing_engine_base.cpp
  14. 3
      modules/gapi/src/streaming/onevpl/engine/processing_engine_base.hpp
  15. 124
      modules/gapi/src/streaming/onevpl/file_data_provider.cpp
  16. 13
      modules/gapi/src/streaming/onevpl/file_data_provider.hpp
  17. 4
      modules/gapi/src/streaming/onevpl/source.cpp
  18. 66
      modules/gapi/src/streaming/onevpl/source_priv.cpp
  19. 2
      modules/gapi/src/streaming/onevpl/source_priv.hpp
  20. 37
      modules/gapi/src/streaming/onevpl/utils.cpp
  21. 6
      modules/gapi/src/streaming/onevpl/utils.hpp
  22. 40
      modules/gapi/test/streaming/gapi_streaming_tests.cpp
  23. 9
      modules/gapi/test/streaming/gapi_streaming_vpl_core_test.cpp
  24. 304
      modules/gapi/test/streaming/gapi_streaming_vpl_data_provider.cpp

@ -180,6 +180,8 @@ set(gapi_srcs
src/streaming/onevpl/engine/processing_engine_base.cpp
src/streaming/onevpl/engine/decode/decode_engine_legacy.cpp
src/streaming/onevpl/engine/decode/decode_session.cpp
src/streaming/onevpl/demux/async_mfp_demux_data_provider.cpp
src/streaming/onevpl/data_provider_dispatcher.cpp
src/streaming/onevpl/cfg_param_device_selector.cpp
src/streaming/onevpl/device_selector_interface.cpp

@ -7,28 +7,39 @@
#ifndef GAPI_STREAMING_ONEVPL_ONEVPL_DATA_PROVIDER_INTERFACE_HPP
#define GAPI_STREAMING_ONEVPL_ONEVPL_DATA_PROVIDER_INTERFACE_HPP
#include <exception>
#include <memory>
#include <string>
#include <opencv2/gapi/own/exports.hpp> // GAPI_EXPORTS
namespace cv {
namespace gapi {
namespace wip {
namespace onevpl {
struct GAPI_EXPORTS DataProviderException : public std::exception {
virtual ~DataProviderException() {}
};
DataProviderException(const std::string& descr);
DataProviderException(std::string&& descr);
struct GAPI_EXPORTS DataProviderSystemErrorException : public DataProviderException {
DataProviderSystemErrorException(int error_code, const std::string& desription = std::string());
virtual ~DataProviderSystemErrorException();
virtual ~DataProviderException() = default;
virtual const char* what() const noexcept override;
private:
std::string reason;
};
struct GAPI_EXPORTS DataProviderSystemErrorException final : public DataProviderException {
DataProviderSystemErrorException(int error_code, const std::string& desription = std::string());
~DataProviderSystemErrorException() = default;
};
struct GAPI_EXPORTS DataProviderUnsupportedException final : public DataProviderException {
DataProviderUnsupportedException(const std::string& description);
~DataProviderUnsupportedException() = default;
};
struct GAPI_EXPORTS DataProviderImplementationException : public DataProviderException {
DataProviderImplementationException(const std::string& description);
~DataProviderImplementationException() = default;
};
/**
* @brief Public interface allows to customize extraction of video stream data
* used by onevpl::GSource instead of reading stream from file (by default).
@ -41,21 +52,41 @@ private:
*/
struct GAPI_EXPORTS IDataProvider {
using Ptr = std::shared_ptr<IDataProvider>;
using mfx_codec_id_type = uint32_t;
/**
* NB: here is supposed to be forward declaration of mfxBitstream
* But according to current oneVPL implementation it is impossible to forward
* declare untagged struct mfxBitstream.
*
* IDataProvider makes sense only for HAVE_VPL is ON and to keep IDataProvider
* interface API/ABI compliant between core library and user application layer
* let's introduce wrapper mfx_bitstream which inherits mfxBitstream in private
* G-API code section and declare forward for wrapper mfx_bitstream here
*/
struct mfx_bitstream;
virtual ~IDataProvider() {}
virtual ~IDataProvider() = default;
/**
* The function is used by onevpl::GSource to extract codec id from data
*
*/
virtual mfx_codec_id_type get_mfx_codec_id() const = 0;
/**
* The function is used by onevpl::GSource to extract binary data stream from @ref IDataProvider
* implementation.
*
* It MUST throw `DataProviderException` kind exceptions in fail cases.
* It MUST return 0 in EOF which considered as not-fail case.
* It MUST return MFX_ERR_MORE_DATA in EOF which considered as not-fail case.
*
* @param out_data_bytes_size the available capacity of out_data buffer.
* @param out_data the output consumer buffer with capacity out_data_bytes_size.
* @return fetched bytes count.
* @param in_out_bitsream the input-output reference on MFX bitstream buffer which MUST be empty at the first request
* to allow implementation to allocate it by itself and to return back. Subsequent invocation of `fetch_bitstream_data`
* MUST use the previously used in_out_bitsream to avoid skipping rest of frames which haven't been consumed
* @return true for fetched data, false on EOF and throws exception on error
*/
virtual size_t fetch_data(size_t out_data_bytes_size, void* out_data) = 0;
virtual bool fetch_bitstream_data(std::shared_ptr<mfx_bitstream> &in_out_bitsream) = 0;
/**
* The function is used by onevpl::GSource to check more binary data availability.

@ -10,6 +10,7 @@
#include <opencv2/gapi/util/variant.hpp>
#include "streaming/onevpl/cfg_param_device_selector.hpp"
#include "streaming/onevpl/cfg_params_parser.hpp"
#include "streaming/onevpl/utils.hpp"
#include "logger.hpp"
@ -37,45 +38,6 @@ namespace gapi {
namespace wip {
namespace onevpl {
// TODO Will be changed on generic function from `onevpl_param_parser` as soons as feature merges
static mfxVariant cfg_param_to_mfx_variant(const CfgParam& accel_param) {
mfxVariant ret;
const CfgParam::value_t& accel_val = accel_param.get_value();
if (!cv::util::holds_alternative<std::string>(accel_val)) {
// expected string or uint32_t as value
if (!cv::util::holds_alternative<uint32_t>(accel_val)) {
throw std::logic_error("Incorrect value type of \"mfxImplDescription.AccelerationMode\" "
" std::string is expected" );
}
ret.Type = MFX_VARIANT_TYPE_U32;
ret.Data.U32 = cv::util::get<uint32_t>(accel_val);
return ret;
}
const std::string& accel_val_str = cv::util::get<std::string>(accel_val);
ret.Type = MFX_VARIANT_TYPE_U32;
if (accel_val_str == "MFX_ACCEL_MODE_NA") {
ret.Data.U32 = MFX_ACCEL_MODE_NA;
} else if (accel_val_str == "MFX_ACCEL_MODE_VIA_D3D9") {
ret.Data.U32 = MFX_ACCEL_MODE_VIA_D3D9;
} else if (accel_val_str == "MFX_ACCEL_MODE_VIA_D3D11") {
ret.Data.U32 = MFX_ACCEL_MODE_VIA_D3D11;
} else if (accel_val_str == "MFX_ACCEL_MODE_VIA_VAAPI") {
ret.Data.U32 = MFX_ACCEL_MODE_VIA_VAAPI;
} else if (accel_val_str == "MFX_ACCEL_MODE_VIA_VAAPI_DRM_MODESET") {
ret.Data.U32 = MFX_ACCEL_MODE_VIA_VAAPI_DRM_MODESET;
} else if (accel_val_str == "MFX_ACCEL_MODE_VIA_VAAPI_GLX") {
ret.Data.U32 = MFX_ACCEL_MODE_VIA_VAAPI_GLX;
} else if (accel_val_str == "MFX_ACCEL_MODE_VIA_VAAPI_X11") {
ret.Data.U32 = MFX_ACCEL_MODE_VIA_VAAPI_X11;
} else if (accel_val_str == "MFX_ACCEL_MODE_VIA_VAAPI_WAYLAND") {
ret.Data.U32 = MFX_ACCEL_MODE_VIA_VAAPI_WAYLAND;
} else if (accel_val_str == "MFX_ACCEL_MODE_VIA_HDDLUNITE") {
ret.Data.U32 = MFX_ACCEL_MODE_VIA_HDDLUNITE;
}
return ret;
}
CfgParamDeviceSelector::CfgParamDeviceSelector(const CfgParams& cfg_params) :
suggested_device(IDeviceSelector::create<Device>(nullptr, "CPU", AccelType::HOST)),
suggested_context(IDeviceSelector::create<Context>(nullptr, AccelType::HOST)) {

@ -0,0 +1,32 @@
#ifndef GAPI_STREAMING_ONEVPL_DATA_PROVIDER_DEFINES_HPP
#define GAPI_STREAMING_ONEVPL_DATA_PROVIDER_DEFINES_HPP
#ifdef HAVE_ONEVPL
#include <vpl/mfxcommon.h>
#include <vpl/mfxvideo.h>
#endif // HAVE_ONEVPL
#include <opencv2/gapi/own/assert.hpp>
#include <opencv2/gapi/streaming/onevpl/data_provider_interface.hpp>
namespace cv {
namespace gapi {
namespace wip {
namespace onevpl {
#ifdef HAVE_ONEVPL
struct IDataProvider::mfx_bitstream : public mfxBitstream {};
#else // HAVE_ONEVPL
struct IDataProvider::mfx_bitstream {
mfx_bitstream() {
GAPI_Assert(false && "Reject to create `mfxBitstream` because library compiled without VPL/MFX support");
}
};
#endif // HAVE_ONEVPL
} // namespace onevpl
} // namespace wip
} // namespace gapi
} // namespace cv
#endif // GAPI_STREAMING_ONEVPL_DATA_PROVIDER_DEFINES_HPP

@ -0,0 +1,68 @@
// This file is part of OpenCV project.
// It is subject to the license terms in the LICENSE file found in the top-level directory
// of this distribution and at http://opencv.org/license.html.
//
// Copyright (C) 2021 Intel Corporation
#ifdef HAVE_ONEVPL
#include "streaming/onevpl/data_provider_dispatcher.hpp"
#include "streaming/onevpl/file_data_provider.hpp"
#include "streaming/onevpl/demux/async_mfp_demux_data_provider.hpp"
#include "logger.hpp"
namespace cv {
namespace gapi {
namespace wip {
namespace onevpl {
IDataProvider::Ptr DataProviderDispatcher::create(const std::string& file_path,
const std::vector<CfgParam> &cfg_params) {
GAPI_LOG_INFO(nullptr, "try select suitable IDataProvider for source: " <<
file_path);
IDataProvider::Ptr provider;
// Look-up CodecId from input params
// If set then raw data provider is preferred
GAPI_LOG_DEBUG(nullptr, "try find explicit cfg param\"mfxImplDescription.mfxDecoderDescription.decoder.CodecID\"");
auto codec_it =
std::find_if(cfg_params.begin(), cfg_params.end(), [] (const CfgParam& value) {
return value.get_name() == "mfxImplDescription.mfxDecoderDescription.decoder.CodecID";
});
if (codec_it != cfg_params.end()) {
GAPI_LOG_DEBUG(nullptr, "Dispatcher found \"mfxImplDescription.mfxDecoderDescription.decoder.CodecID\""
" so try on raw data provider at first");
try {
provider = std::make_shared<FileDataProvider>(file_path, cfg_params);
GAPI_LOG_INFO(nullptr, "raw data provider created");
} catch (const DataProviderUnsupportedException& ex) {
GAPI_LOG_INFO(nullptr, "raw data provider creation is failed, reason: " <<
ex.what());
}
}
if (!provider) {
GAPI_LOG_DEBUG(nullptr, "Try on MFP data provider");
try {
provider = std::make_shared<MFPAsyncDemuxDataProvider>(file_path);
GAPI_LOG_INFO(nullptr, "MFP data provider created");
} catch (const DataProviderUnsupportedException& ex) {
GAPI_LOG_INFO(nullptr, "MFP data provider creation is failed, reason: " <<
ex.what());
}
}
// final check
if (!provider) {
GAPI_LOG_WARNING(nullptr, "Cannot find suitable data provider");
throw DataProviderUnsupportedException("Unsupported source or configuration parameters");;
}
return provider;
}
} // namespace onevpl
} // namespace wip
} // namespace gapi
} // namespace cv
#endif // HAVE_ONEVPL

@ -0,0 +1,29 @@
// This file is part of OpenCV project.
// It is subject to the license terms in the LICENSE file found in the top-level directory
// of this distribution and at http://opencv.org/license.html.
//
// Copyright (C) 2021 Intel Corporation
#ifndef GAPI_STREAMING_ONEVPL_DATA_PROVIDER_DISPATCHER_HPP
#define GAPI_STREAMING_ONEVPL_DATA_PROVIDER_DISPATCHER_HPP
#ifdef HAVE_ONEVPL
#include <opencv2/gapi/streaming/onevpl/data_provider_interface.hpp>
#include <opencv2/gapi/streaming/onevpl/cfg_params.hpp>
namespace cv {
namespace gapi {
namespace wip {
namespace onevpl {
struct GAPI_EXPORTS DataProviderDispatcher {
static IDataProvider::Ptr create(const std::string& file_path,
const std::vector<CfgParam> &codec_params = {});
};
} // namespace onevpl
} // namespace wip
} // namespace gapi
} // namespace cv
#endif // HAVE_ONEVPL
#endif // GAPI_STREAMING_ONEVPL_DATA_PROVIDER_DISPATCHER_HPP

@ -1,3 +1,14 @@
// This file is part of OpenCV project.
// It is subject to the license terms in the LICENSE file found in the top-level directory
// of this distribution and at http://opencv.org/license.html.
//
// Copyright (C) 2021 Intel Corporation
#ifdef HAVE_ONEVPL
#include <vpl/mfxvideo.h>
#include <vpl/mfxjpeg.h>
#endif // HAVE_ONEVPL
#include <errno.h>
#include <string.h>
@ -7,15 +18,31 @@ namespace cv {
namespace gapi {
namespace wip {
namespace onevpl {
DataProviderSystemErrorException::DataProviderSystemErrorException(int error_code, const std::string& desription) {
reason = desription + ", error: " + std::to_string(error_code) + ", desctiption: " + strerror(error_code);
}
DataProviderSystemErrorException::~DataProviderSystemErrorException() = default;
DataProviderException::DataProviderException(const std::string& descr) :
reason(descr) {
}
DataProviderException::DataProviderException(std::string&& descr) :
reason(std::move(descr)) {
}
const char* DataProviderSystemErrorException::what() const noexcept {
const char* DataProviderException::what() const noexcept {
return reason.c_str();
}
DataProviderSystemErrorException::DataProviderSystemErrorException(int error_code,
const std::string& description) :
DataProviderException(description + ", error code: " + std::to_string(error_code) + " - " + strerror(error_code)) {
}
DataProviderUnsupportedException::DataProviderUnsupportedException(const std::string& description) :
DataProviderException(description) {
}
DataProviderImplementationException::DataProviderImplementationException(const std::string& description) :
DataProviderException(description) {
}
} // namespace onevpl
} // namespace wip
} // namespace gapi

@ -0,0 +1,820 @@
// This file is part of OpenCV project.
// It is subject to the license terms in the LICENSE file found in the top-level directory
// of this distribution and at http://opencv.org/license.html.
//
// Copyright (C) 2021 Intel Corporation
#ifdef HAVE_ONEVPL
#include <errno.h>
#ifdef _WIN32
#pragma comment(lib, "Mf.lib")
#pragma comment(lib, "Mfuuid.lib")
#pragma comment(lib, "Mfplat.lib")
#pragma comment(lib, "shlwapi.lib")
#pragma comment(lib, "mfreadwrite.lib")
#endif // _WIN32
#include "streaming/onevpl/demux/async_mfp_demux_data_provider.hpp"
#include "logger.hpp"
namespace cv {
namespace gapi {
namespace wip {
namespace onevpl {
#ifdef _WIN32
static HRESULT create_media_source(const std::string& url, IMFMediaSource **ppSource) {
wchar_t sURL[MAX_PATH];
GAPI_Assert(url.size() < MAX_PATH && "Windows MAX_PATH limit was reached");
size_t ret_url_length = 0;
mbstowcs_s(&ret_url_length, sURL, url.data(), url.size());
HRESULT hr = S_OK;
ComPtrGuard<IMFSourceResolver> source_resolver = createCOMPtrGuard<IMFSourceResolver>();
{
IMFSourceResolver *source_resolver_tmp = nullptr;
hr = MFCreateSourceResolver(&source_resolver_tmp);
if (FAILED(hr)) {
throw DataProviderSystemErrorException(HRESULT_CODE(hr),
"cannot create MFCreateSourceResolver from URI: " +
url);
}
source_resolver.reset(source_resolver_tmp);
}
MF_OBJECT_TYPE ObjectType = MF_OBJECT_INVALID;
/**
* NB:
* CreateObjectFromURL throws exception if actual container type is mismatched with
* file extension. To overcome this situation by MFP it is possible to apply 2 step
* approach: at first step we pass special flag
* `MF_RESOLUTION_KEEP_BYTE_STREAM_ALIVE_ON_FAIL` which claims to fail with error
* in any case of input instead exception throwing;
* at the second step we must cease `MF_RESOLUTION_KEEP_BYTE_STREAM_ALIVE_ON_FAIL`
* flag AND set another special flag
* `MF_RESOLUTION_CONTENT_DOES_NOT_HAVE_TO_MATCH_EXTENSION_OR_MIME_TYPE`
* to filter out container type & file extension mismatch errors.
*
* If it failed at second phase then some other errors were not related
* to types-extension disturbance would happen and data provider must fail ultimately.
*
* If second step passed then data provider would continue execution
*/
IUnknown *source_unknown_tmp = nullptr;
DWORD resolver_flags = MF_RESOLUTION_MEDIASOURCE | MF_RESOLUTION_READ |
MF_RESOLUTION_KEEP_BYTE_STREAM_ALIVE_ON_FAIL;
hr = source_resolver->CreateObjectFromURL(sURL,
resolver_flags,
nullptr, &ObjectType,
&source_unknown_tmp);
if (FAILED(hr)) {
GAPI_LOG_DEBUG(nullptr, "Cannot create MF_RESOLUTION_MEDIASOURCE using file extension, "
" looks like actual media container type doesn't match to file extension. "
"Try special mode");
resolver_flags ^= MF_RESOLUTION_KEEP_BYTE_STREAM_ALIVE_ON_FAIL;
resolver_flags ^= MF_RESOLUTION_CONTENT_DOES_NOT_HAVE_TO_MATCH_EXTENSION_OR_MIME_TYPE;
hr = source_resolver->CreateObjectFromURL(sURL, resolver_flags,
nullptr, &ObjectType,
&source_unknown_tmp);
if (FAILED(hr)) {
GAPI_LOG_WARNING(nullptr, "Cannot create MF_RESOLUTION_MEDIASOURCE from URI: " <<
url << ". Abort");
throw DataProviderSystemErrorException(HRESULT_CODE(hr),
"CreateObjectFromURL failed");
}
}
ComPtrGuard<IUnknown> source_unknown = createCOMPtrGuard(source_unknown_tmp);
hr = source_unknown->QueryInterface(__uuidof(IMFMediaSource), (void**)ppSource);
if (FAILED(hr)) {
throw DataProviderSystemErrorException(HRESULT_CODE(hr),
"QueryInterface for IMFMediaSource failed");
}
return hr;
}
/*
* The next part of converting GUID into string function GetGUIDNameConst
* was copied and modified from
* https://docs.microsoft.com/en-us/windows/win32/medfound/media-type-debugging-code
*/
#ifndef IF_EQUAL_RETURN
#define IF_EQUAL_RETURN(param, val) if(val == param) return #val
#endif
static const char* GetGUIDNameConst(const GUID& guid)
{
IF_EQUAL_RETURN(guid, MF_MT_MAJOR_TYPE);
IF_EQUAL_RETURN(guid, MF_MT_MAJOR_TYPE);
IF_EQUAL_RETURN(guid, MF_MT_SUBTYPE);
IF_EQUAL_RETURN(guid, MF_MT_ALL_SAMPLES_INDEPENDENT);
IF_EQUAL_RETURN(guid, MF_MT_FIXED_SIZE_SAMPLES);
IF_EQUAL_RETURN(guid, MF_MT_COMPRESSED);
IF_EQUAL_RETURN(guid, MF_MT_SAMPLE_SIZE);
IF_EQUAL_RETURN(guid, MF_MT_WRAPPED_TYPE);
IF_EQUAL_RETURN(guid, MF_MT_AUDIO_NUM_CHANNELS);
IF_EQUAL_RETURN(guid, MF_MT_AUDIO_SAMPLES_PER_SECOND);
IF_EQUAL_RETURN(guid, MF_MT_AUDIO_FLOAT_SAMPLES_PER_SECOND);
IF_EQUAL_RETURN(guid, MF_MT_AUDIO_AVG_BYTES_PER_SECOND);
IF_EQUAL_RETURN(guid, MF_MT_AUDIO_BLOCK_ALIGNMENT);
IF_EQUAL_RETURN(guid, MF_MT_AUDIO_BITS_PER_SAMPLE);
IF_EQUAL_RETURN(guid, MF_MT_AUDIO_VALID_BITS_PER_SAMPLE);
IF_EQUAL_RETURN(guid, MF_MT_AUDIO_SAMPLES_PER_BLOCK);
IF_EQUAL_RETURN(guid, MF_MT_AUDIO_CHANNEL_MASK);
IF_EQUAL_RETURN(guid, MF_MT_AUDIO_FOLDDOWN_MATRIX);
IF_EQUAL_RETURN(guid, MF_MT_AUDIO_WMADRC_PEAKREF);
IF_EQUAL_RETURN(guid, MF_MT_AUDIO_WMADRC_PEAKTARGET);
IF_EQUAL_RETURN(guid, MF_MT_AUDIO_WMADRC_AVGREF);
IF_EQUAL_RETURN(guid, MF_MT_AUDIO_WMADRC_AVGTARGET);
IF_EQUAL_RETURN(guid, MF_MT_AUDIO_PREFER_WAVEFORMATEX);
IF_EQUAL_RETURN(guid, MF_MT_AAC_PAYLOAD_TYPE);
IF_EQUAL_RETURN(guid, MF_MT_AAC_AUDIO_PROFILE_LEVEL_INDICATION);
IF_EQUAL_RETURN(guid, MF_MT_FRAME_SIZE);
IF_EQUAL_RETURN(guid, MF_MT_FRAME_RATE);
IF_EQUAL_RETURN(guid, MF_MT_FRAME_RATE_RANGE_MAX);
IF_EQUAL_RETURN(guid, MF_MT_FRAME_RATE_RANGE_MIN);
IF_EQUAL_RETURN(guid, MF_MT_PIXEL_ASPECT_RATIO);
IF_EQUAL_RETURN(guid, MF_MT_DRM_FLAGS);
IF_EQUAL_RETURN(guid, MF_MT_PAD_CONTROL_FLAGS);
IF_EQUAL_RETURN(guid, MF_MT_SOURCE_CONTENT_HINT);
IF_EQUAL_RETURN(guid, MF_MT_VIDEO_CHROMA_SITING);
IF_EQUAL_RETURN(guid, MF_MT_INTERLACE_MODE);
IF_EQUAL_RETURN(guid, MF_MT_TRANSFER_FUNCTION);
IF_EQUAL_RETURN(guid, MF_MT_VIDEO_PRIMARIES);
IF_EQUAL_RETURN(guid, MF_MT_CUSTOM_VIDEO_PRIMARIES);
IF_EQUAL_RETURN(guid, MF_MT_YUV_MATRIX);
IF_EQUAL_RETURN(guid, MF_MT_VIDEO_LIGHTING);
IF_EQUAL_RETURN(guid, MF_MT_VIDEO_NOMINAL_RANGE);
IF_EQUAL_RETURN(guid, MF_MT_GEOMETRIC_APERTURE);
IF_EQUAL_RETURN(guid, MF_MT_MINIMUM_DISPLAY_APERTURE);
IF_EQUAL_RETURN(guid, MF_MT_PAN_SCAN_APERTURE);
IF_EQUAL_RETURN(guid, MF_MT_PAN_SCAN_ENABLED);
IF_EQUAL_RETURN(guid, MF_MT_AVG_BITRATE);
IF_EQUAL_RETURN(guid, MF_MT_AVG_BIT_ERROR_RATE);
IF_EQUAL_RETURN(guid, MF_MT_MAX_KEYFRAME_SPACING);
IF_EQUAL_RETURN(guid, MF_MT_DEFAULT_STRIDE);
IF_EQUAL_RETURN(guid, MF_MT_PALETTE);
IF_EQUAL_RETURN(guid, MF_MT_USER_DATA);
IF_EQUAL_RETURN(guid, MF_MT_AM_FORMAT_TYPE);
IF_EQUAL_RETURN(guid, MF_MT_MPEG_START_TIME_CODE);
IF_EQUAL_RETURN(guid, MF_MT_MPEG2_PROFILE);
IF_EQUAL_RETURN(guid, MF_MT_MPEG2_LEVEL);
IF_EQUAL_RETURN(guid, MF_MT_MPEG2_FLAGS);
IF_EQUAL_RETURN(guid, MF_MT_MPEG_SEQUENCE_HEADER);
IF_EQUAL_RETURN(guid, MF_MT_DV_AAUX_SRC_PACK_0);
IF_EQUAL_RETURN(guid, MF_MT_DV_AAUX_CTRL_PACK_0);
IF_EQUAL_RETURN(guid, MF_MT_DV_AAUX_SRC_PACK_1);
IF_EQUAL_RETURN(guid, MF_MT_DV_AAUX_CTRL_PACK_1);
IF_EQUAL_RETURN(guid, MF_MT_DV_VAUX_SRC_PACK);
IF_EQUAL_RETURN(guid, MF_MT_DV_VAUX_CTRL_PACK);
IF_EQUAL_RETURN(guid, MF_MT_ARBITRARY_HEADER);
IF_EQUAL_RETURN(guid, MF_MT_ARBITRARY_FORMAT);
IF_EQUAL_RETURN(guid, MF_MT_IMAGE_LOSS_TOLERANT);
IF_EQUAL_RETURN(guid, MF_MT_MPEG4_SAMPLE_DESCRIPTION);
IF_EQUAL_RETURN(guid, MF_MT_MPEG4_CURRENT_SAMPLE_ENTRY);
IF_EQUAL_RETURN(guid, MF_MT_ORIGINAL_4CC);
IF_EQUAL_RETURN(guid, MF_MT_ORIGINAL_WAVE_FORMAT_TAG);
// Media types
IF_EQUAL_RETURN(guid, MFMediaType_Audio);
IF_EQUAL_RETURN(guid, MFMediaType_Video);
IF_EQUAL_RETURN(guid, MFMediaType_Protected);
IF_EQUAL_RETURN(guid, MFMediaType_SAMI);
IF_EQUAL_RETURN(guid, MFMediaType_Script);
IF_EQUAL_RETURN(guid, MFMediaType_Image);
IF_EQUAL_RETURN(guid, MFMediaType_HTML);
IF_EQUAL_RETURN(guid, MFMediaType_Binary);
IF_EQUAL_RETURN(guid, MFMediaType_FileTransfer);
IF_EQUAL_RETURN(guid, MFVideoFormat_AI44); // FCC('AI44')
IF_EQUAL_RETURN(guid, MFVideoFormat_ARGB32); // D3DFMT_A8R8G8B8
IF_EQUAL_RETURN(guid, MFVideoFormat_AV1);
IF_EQUAL_RETURN(guid, MFVideoFormat_AYUV); // FCC('AYUV')
IF_EQUAL_RETURN(guid, MFVideoFormat_DV25); // FCC('dv25')
IF_EQUAL_RETURN(guid, MFVideoFormat_DV50); // FCC('dv50')
IF_EQUAL_RETURN(guid, MFVideoFormat_DVH1); // FCC('dvh1')
IF_EQUAL_RETURN(guid, MFVideoFormat_DVSD); // FCC('dvsd')
IF_EQUAL_RETURN(guid, MFVideoFormat_DVSL); // FCC('dvsl')
IF_EQUAL_RETURN(guid, MFVideoFormat_H264); // FCC('H264')
IF_EQUAL_RETURN(guid, MFVideoFormat_H265);
IF_EQUAL_RETURN(guid, MFVideoFormat_HEVC);
IF_EQUAL_RETURN(guid, MFVideoFormat_HEVC_ES);
IF_EQUAL_RETURN(guid, MFVideoFormat_I420); // FCC('I420')
IF_EQUAL_RETURN(guid, MFVideoFormat_IYUV); // FCC('IYUV')
IF_EQUAL_RETURN(guid, MFVideoFormat_M4S2); // FCC('M4S2')
IF_EQUAL_RETURN(guid, MFVideoFormat_MJPG);
IF_EQUAL_RETURN(guid, MFVideoFormat_MP43); // FCC('MP43')
IF_EQUAL_RETURN(guid, MFVideoFormat_MP4S); // FCC('MP4S')
IF_EQUAL_RETURN(guid, MFVideoFormat_MP4V); // FCC('MP4V')
IF_EQUAL_RETURN(guid, MFVideoFormat_MPG1); // FCC('MPG1')
IF_EQUAL_RETURN(guid, MFVideoFormat_MSS1); // FCC('MSS1')
IF_EQUAL_RETURN(guid, MFVideoFormat_MSS2); // FCC('MSS2')
IF_EQUAL_RETURN(guid, MFVideoFormat_NV11); // FCC('NV11')
IF_EQUAL_RETURN(guid, MFVideoFormat_NV12); // FCC('NV12')
IF_EQUAL_RETURN(guid, MFVideoFormat_P010); // FCC('P010')
IF_EQUAL_RETURN(guid, MFVideoFormat_P016); // FCC('P016')
IF_EQUAL_RETURN(guid, MFVideoFormat_P210); // FCC('P210')
IF_EQUAL_RETURN(guid, MFVideoFormat_P216); // FCC('P216')
IF_EQUAL_RETURN(guid, MFVideoFormat_RGB24); // D3DFMT_R8G8B8
IF_EQUAL_RETURN(guid, MFVideoFormat_RGB32); // D3DFMT_X8R8G8B8
IF_EQUAL_RETURN(guid, MFVideoFormat_RGB555); // D3DFMT_X1R5G5B5
IF_EQUAL_RETURN(guid, MFVideoFormat_RGB565); // D3DFMT_R5G6B5
IF_EQUAL_RETURN(guid, MFVideoFormat_RGB8);
IF_EQUAL_RETURN(guid, MFVideoFormat_UYVY); // FCC('UYVY')
IF_EQUAL_RETURN(guid, MFVideoFormat_v210); // FCC('v210')
IF_EQUAL_RETURN(guid, MFVideoFormat_v410); // FCC('v410')
IF_EQUAL_RETURN(guid, MFVideoFormat_WMV1); // FCC('WMV1')
IF_EQUAL_RETURN(guid, MFVideoFormat_WMV2); // FCC('WMV2')
IF_EQUAL_RETURN(guid, MFVideoFormat_WMV3); // FCC('WMV3')
IF_EQUAL_RETURN(guid, MFVideoFormat_WVC1); // FCC('WVC1')
IF_EQUAL_RETURN(guid, MFVideoFormat_VP90);
IF_EQUAL_RETURN(guid, MFVideoFormat_Y210); // FCC('Y210')
IF_EQUAL_RETURN(guid, MFVideoFormat_Y216); // FCC('Y216')
IF_EQUAL_RETURN(guid, MFVideoFormat_Y410); // FCC('Y410')
IF_EQUAL_RETURN(guid, MFVideoFormat_Y416); // FCC('Y416')
IF_EQUAL_RETURN(guid, MFVideoFormat_Y41P);
IF_EQUAL_RETURN(guid, MFVideoFormat_Y41T);
IF_EQUAL_RETURN(guid, MFVideoFormat_YUY2); // FCC('YUY2')
IF_EQUAL_RETURN(guid, MFVideoFormat_YV12); // FCC('YV12')
IF_EQUAL_RETURN(guid, MFVideoFormat_YVYU);
IF_EQUAL_RETURN(guid, MFAudioFormat_PCM); // WAVE_FORMAT_PCM
IF_EQUAL_RETURN(guid, MFAudioFormat_Float); // WAVE_FORMAT_IEEE_FLOAT
IF_EQUAL_RETURN(guid, MFAudioFormat_DTS); // WAVE_FORMAT_DTS
IF_EQUAL_RETURN(guid, MFAudioFormat_Dolby_AC3_SPDIF); // WAVE_FORMAT_DOLBY_AC3_SPDIF
IF_EQUAL_RETURN(guid, MFAudioFormat_DRM); // WAVE_FORMAT_DRM
IF_EQUAL_RETURN(guid, MFAudioFormat_WMAudioV8); // WAVE_FORMAT_WMAUDIO2
IF_EQUAL_RETURN(guid, MFAudioFormat_WMAudioV9); // WAVE_FORMAT_WMAUDIO3
IF_EQUAL_RETURN(guid, MFAudioFormat_WMAudio_Lossless); // WAVE_FORMAT_WMAUDIO_LOSSLESS
IF_EQUAL_RETURN(guid, MFAudioFormat_WMASPDIF); // WAVE_FORMAT_WMASPDIF
IF_EQUAL_RETURN(guid, MFAudioFormat_MSP1); // WAVE_FORMAT_WMAVOICE9
IF_EQUAL_RETURN(guid, MFAudioFormat_MP3); // WAVE_FORMAT_MPEGLAYER3
IF_EQUAL_RETURN(guid, MFAudioFormat_MPEG); // WAVE_FORMAT_MPEG
IF_EQUAL_RETURN(guid, MFAudioFormat_AAC); // WAVE_FORMAT_MPEG_HEAAC
IF_EQUAL_RETURN(guid, MFAudioFormat_ADTS); // WAVE_FORMAT_MPEG_ADTS_AAC
return "<unknown>";
}
static IDataProvider::mfx_codec_id_type convert_to_mfx_codec_id(const GUID& guid) {
if (guid == MFVideoFormat_H264) {
return MFX_CODEC_AVC;
} else if (guid == MFVideoFormat_H265 ||
guid == MFVideoFormat_HEVC ||
guid == MFVideoFormat_HEVC_ES) {
return MFX_CODEC_HEVC;
} else if (guid == MFAudioFormat_MPEG) {
return MFX_CODEC_MPEG2;
} else if (guid == MFVideoFormat_WVC1) {
return MFX_CODEC_VC1;
} else if (guid == MFVideoFormat_VP90) {
return MFX_CODEC_VP9;
} else if (guid == MFVideoFormat_AV1) {
return MFX_CODEC_AV1;
} else if (guid == MFVideoFormat_MJPG) {
return MFX_CODEC_JPEG;
}
throw DataProviderUnsupportedException(std::string("unsupported codec type: ") +
GetGUIDNameConst(guid));
}
bool MFPAsyncDemuxDataProvider::select_supported_video_stream(
ComPtrGuard<IMFPresentationDescriptor> &descriptor,
mfx_codec_id_type &out_codec_id,
void *source_id) {
DWORD stream_count = 0;
BOOL is_stream_selected = false;
descriptor->GetStreamDescriptorCount(&stream_count);
GAPI_LOG_DEBUG(nullptr, "[" << source_id << "] " <<
"received stream count: " << stream_count);
for (DWORD stream_index = 0;
stream_index < stream_count && !is_stream_selected; stream_index++) {
GAPI_LOG_DEBUG(nullptr, "[" << source_id << "] " <<
"check stream info by index: " << stream_index);
IMFStreamDescriptor *stream_descriptor_tmp = nullptr;
descriptor->GetStreamDescriptorByIndex(stream_index, &is_stream_selected,
&stream_descriptor_tmp);
if (!stream_descriptor_tmp) {
GAPI_LOG_WARNING(nullptr, "[" << source_id << "] " <<
"Cannot get stream descriptor by index: " <<
stream_index);
continue;
}
ComPtrGuard<IMFStreamDescriptor> stream_descriptor =
createCOMPtrGuard(stream_descriptor_tmp);
is_stream_selected = false; // deselect until supported stream found
IMFMediaTypeHandler *handler_tmp = nullptr;
stream_descriptor->GetMediaTypeHandler(&handler_tmp);
if (!handler_tmp) {
GAPI_LOG_WARNING(nullptr, "[" << source_id << "] " <<
"Cannot get media type handler for stream by index: " <<
stream_index);
continue;
}
ComPtrGuard<IMFMediaTypeHandler> handler = createCOMPtrGuard(handler_tmp);
GUID guidMajorType;
if (FAILED(handler->GetMajorType(&guidMajorType))) {
GAPI_LOG_WARNING(nullptr, "[" << source_id << "] " <<
"Cannot get major GUID type for stream by index: " <<
stream_index);
continue;
}
if (guidMajorType != MFMediaType_Video) {
GAPI_LOG_DEBUG(nullptr, "[" << source_id << "] " <<
"Skipping non-video stream");
continue;
}
GAPI_LOG_DEBUG(nullptr, "[" << source_id << "] " <<
"video stream detected");
IMFMediaType *media_type_tmp = nullptr;
handler->GetCurrentMediaType(&media_type_tmp);
if (!media_type_tmp) {
GAPI_LOG_WARNING(nullptr, "[" << source_id << "] " <<
"Cannot determine media type for stream by index: " <<
stream_index);
continue;
}
ComPtrGuard<IMFMediaType> media_type = createCOMPtrGuard(media_type_tmp);
GUID subtype;
if (SUCCEEDED(media_type->GetGUID(MF_MT_SUBTYPE, &subtype))) {
GAPI_LOG_DEBUG(nullptr, "[" << source_id << "] " <<
"video type: " << GetGUIDNameConst(subtype));
std::string is_codec_supported("unsupported, skip...");
try {
out_codec_id = convert_to_mfx_codec_id(subtype);
is_stream_selected = true;
is_codec_supported = "selected!";
} catch (...) {}
GAPI_LOG_INFO(nullptr, "[" << source_id << "] " <<
"video stream index: " << stream_index <<
", codec: " << GetGUIDNameConst(subtype) <<
" - " << is_codec_supported)
} else {
GAPI_LOG_WARNING(nullptr, "[" << source_id << "] " <<
"Cannot get media GUID subtype for stream by index: " <<
stream_index);
continue;
}
}
return is_stream_selected;
}
MFPAsyncDemuxDataProvider::MFPAsyncDemuxDataProvider(const std::string& file_path,
size_t keep_preprocessed_buf_count_value) :
keep_preprocessed_buf_count(keep_preprocessed_buf_count_value),
source(createCOMPtrGuard<IMFMediaSource>()),
source_reader(createCOMPtrGuard<IMFSourceReader>()),
codec(std::numeric_limits<uint32_t>::max()),
provider_state(State::InProgress) {
submit_read_request.clear();
com_interface_reference_count = 1; // object itself
HRESULT hr = S_OK;
hr = MFStartup(MF_VERSION);
if (FAILED(hr)) {
throw DataProviderSystemErrorException(HRESULT_CODE(hr), "Cannot initialize MFStartup");
}
GAPI_LOG_INFO(nullptr, "[" << this << "] " <<
" initializing, URI " << file_path);
IMFMediaSource *source_tmp = nullptr;
hr = create_media_source(file_path, &source_tmp);
if (FAILED(hr)) {
throw DataProviderSystemErrorException(HRESULT_CODE(hr), "Cannot create IMFMediaSource");
}
source.reset(source_tmp);
GAPI_LOG_DEBUG(nullptr, "[" << this << "] " <<
" start creating source attributes");
IMFAttributes *attrs_tmp = nullptr;
// NB: create 2 attributes for disable converters & async callback capability
const UINT32 relevant_attributes_count = 2;
hr = MFCreateAttributes(&attrs_tmp, relevant_attributes_count);
if (FAILED(hr)) {
throw DataProviderSystemErrorException(HRESULT_CODE(hr), "MFCreateAttributes failed");
}
ComPtrGuard<IMFAttributes> attributes = createCOMPtrGuard(attrs_tmp);
hr = attributes->SetUINT32(MF_READWRITE_DISABLE_CONVERTERS, TRUE);
// set the callback pointer.
if (SUCCEEDED(hr))
{
hr = attributes->SetUnknown(
MF_SOURCE_READER_ASYNC_CALLBACK,
this
);
}
if (FAILED(hr)) {
throw DataProviderSystemErrorException(HRESULT_CODE(hr), "Cannot set MFP async callback ");
}
GAPI_LOG_DEBUG(nullptr, "[" << this << "] " <<
"is getting presentation descriptor");
IMFPresentationDescriptor* descriptor_tmp = nullptr;
hr = source->CreatePresentationDescriptor(&descriptor_tmp);
if (FAILED(hr)) {
throw DataProviderSystemErrorException(HRESULT_CODE(hr),
"CreatePresentationDescriptor failed");
}
ComPtrGuard<IMFPresentationDescriptor> descriptor = createCOMPtrGuard(descriptor_tmp);
if (!MFPAsyncDemuxDataProvider::select_supported_video_stream(descriptor, codec, this)) {
// NB: let's pretty notify clients about list of supported codecs to keep
// contract in explicit way to avoid continuous troubleshooting
const auto &supported_codecs = get_supported_mfx_codec_ids();
std::string ss;
for (mfxU32 id : supported_codecs) {
ss += mfx_codec_id_to_cstr(id);
ss += ", ";
}
if (!ss.empty()) {
ss.erase(ss.size() - 2, 2);
}
GAPI_LOG_WARNING(nullptr, "[" << this << "] "
"couldn't find video stream with supported params, "
"expected codecs: " << ss);
throw DataProviderUnsupportedException("couldn't find supported video stream");
}
GAPI_LOG_DEBUG(nullptr, "[" << this << "] " <<
"is creating media source");
IMFSourceReader *source_reader_tmp = nullptr;
hr = MFCreateSourceReaderFromMediaSource(source.get(), attributes.get(),
&source_reader_tmp);
if (FAILED(hr)) {
throw DataProviderSystemErrorException(HRESULT_CODE(hr),
"MFCreateSourceReaderFromMediaSource failed");
}
source_reader = createCOMPtrGuard(source_reader_tmp);
GAPI_LOG_DEBUG(nullptr, "[" << this << "] " <<
"created IMFSourceReader: " << source_reader);
// Ask for the first sample.
hr = request_next(hr, 0, 0);
if (FAILED(hr)) {
throw DataProviderSystemErrorException(HRESULT_CODE(hr),
"ReadSample failed while requesting initial sample");
}
GAPI_LOG_INFO(nullptr, "[" << this << "] " <<
"initialized");
}
MFPAsyncDemuxDataProvider::~MFPAsyncDemuxDataProvider() {
GAPI_LOG_INFO(nullptr, "[" << this << "] " <<
"begin deinitializing");
flush();
{
std::unique_lock<std::mutex> l(buffer_storage_mutex);
GAPI_LOG_INFO(nullptr, "Clean up async storage, count: " <<
worker_key_to_buffer_mapping_storage.size());
for (auto& buffer : worker_key_to_buffer_mapping_storage) {
if (buffer.second) {
buffer.second->Unlock();
}
}
worker_key_to_buffer_mapping_storage.clear();
}
GAPI_LOG_INFO(nullptr, "Clean data storage, elapsed buffer count: " <<
processing_key_to_buffer_mapping_storage.size());
for (auto& buffer : processing_key_to_buffer_mapping_storage) {
if (buffer.second) {
buffer.second->Unlock();
}
}
processing_key_to_buffer_mapping_storage.clear();
// release COM object before overall MFP shutdown
source_reader.reset();
source.reset();
MFShutdown();
GAPI_LOG_INFO(nullptr, "[" << this << "] " <<
"deinitialized");
}
ULONG MFPAsyncDemuxDataProvider::AddRef() {
// align behavior with InterlockedIncrement
return com_interface_reference_count.fetch_add(1) + 1;
}
ULONG MFPAsyncDemuxDataProvider::Release() {
auto count = com_interface_reference_count.fetch_sub(1);
GAPI_Assert(count != 0 && "Incorrect reference counting for MFPAsyncDemuxDataProvider");
count -= 1; // align behavior with InterlockedDecrement
return count;
}
HRESULT MFPAsyncDemuxDataProvider::QueryInterface(REFIID riid, void** ppv)
{
static const QITAB qit[] =
{
QITABENT(MFPAsyncDemuxDataProvider, IMFSourceReaderCallback),
{ 0 },
};
return QISearch(this, qit, riid, ppv);
}
STDMETHODIMP
MFPAsyncDemuxDataProvider::OnReadSample(HRESULT status, DWORD,
DWORD stream_flag, LONGLONG,
IMFSample *sample_ptr) {
GAPI_LOG_DEBUG(nullptr, "[" << this << "] status: " << std::to_string(HRESULT_CODE(status)) <<
", stream flags: " << stream_flag <<
", sample: " << sample_ptr);
HRESULT hr = S_OK;
if (FAILED(status)) {
hr = status;
}
// check EOF
if (stream_flag & MF_SOURCE_READERF_ENDOFSTREAM) {
GAPI_LOG_DEBUG(nullptr, "[" << this << "] EOF");
// close reader
provider_state.store(State::Exhausted);
buffer_storage_non_empty_cond.notify_all();
return hr;
}
submit_read_request.clear();
// extract stream data
size_t worker_buffer_count = 0;
if (SUCCEEDED(hr)) {
if (sample_ptr) {
// Get the video frame buffer from the sample.
IMFMediaBuffer *buffer_ptr = nullptr;
hr = sample_ptr->ConvertToContiguousBuffer(&buffer_ptr);
GAPI_Assert(SUCCEEDED(hr) &&
"MFPAsyncDemuxDataProvider::OnReadSample - ConvertToContiguousBuffer failed");
DWORD max_buffer_size = 0;
DWORD curr_size = 0;
// lock buffer directly into mfx bitstream
std::shared_ptr<mfx_bitstream> staging_stream = std::make_shared<mfx_bitstream>();
staging_stream->Data = nullptr;
hr = buffer_ptr->Lock(&staging_stream->Data, &max_buffer_size, &curr_size);
GAPI_Assert(SUCCEEDED(hr) &&
"MFPAsyncDemuxDataProvider::OnReadSample - Lock failed");
staging_stream->MaxLength = max_buffer_size;
staging_stream->DataLength = curr_size;
staging_stream->CodecId = get_mfx_codec_id();
GAPI_LOG_DEBUG(nullptr, "[" << this << "] bitstream created, data: " <<
static_cast<void*>(staging_stream->Data) <<
", MaxLength: " << staging_stream->MaxLength <<
", DataLength: " << staging_stream->DataLength);
worker_buffer_count = produce_worker_data(staging_stream->Data,
createCOMPtrGuard(buffer_ptr),
std::move(staging_stream));
}
} else {
GAPI_LOG_WARNING(nullptr, "[" << this << "] callback failed"
", status: " << std::to_string(HRESULT_CODE(status)) <<
", stream flags: " << stream_flag <<
", sample: " << sample_ptr);
}
hr = request_next(hr, stream_flag, worker_buffer_count);
return hr;
}
size_t MFPAsyncDemuxDataProvider::get_locked_buffer_size() const {
std::unique_lock<std::mutex> l(buffer_storage_mutex);
return worker_locked_buffer_storage.size();
}
STDMETHODIMP MFPAsyncDemuxDataProvider::OnEvent(DWORD, IMFMediaEvent *) {
return S_OK;
}
STDMETHODIMP MFPAsyncDemuxDataProvider::OnFlush(DWORD) {
provider_state.store(State::Exhausted);
buffer_storage_non_empty_cond.notify_all();
return S_OK;
}
void MFPAsyncDemuxDataProvider::flush() {
if(source_reader) {
GAPI_LOG_INFO(nullptr, "[" << this << "] set flush");
source_reader->Flush(static_cast<DWORD>(MF_SOURCE_READER_ALL_STREAMS));
}
size_t iterations = 0;
const int waiting_ms = 100;
const size_t warning_iteration_wait_count = 300; // approx 30 sec
while (provider_state.load() != State::Exhausted) {
iterations++;
if (iterations > warning_iteration_wait_count) {
GAPI_LOG_WARNING(nullptr, "[" << this << "] is still waiting for flush finishing, "
"iteration: " << iterations);
} else {
GAPI_LOG_DEBUG(nullptr, "[" << this << "] is waiting for flush finishing, "
"iteration: " << iterations);
}
std::unique_lock<std::mutex> l(buffer_storage_mutex);
buffer_storage_non_empty_cond.wait_for(l, std::chrono::milliseconds(waiting_ms));
}
GAPI_LOG_INFO(nullptr, "[" << this << "] has flushed in: " <<
iterations * waiting_ms << "ms interval");
}
HRESULT MFPAsyncDemuxDataProvider::request_next(HRESULT hr,
DWORD stream_flag,
size_t worker_buffer_count) {
GAPI_LOG_DEBUG(nullptr, "[" << this << "] status: " <<
std::to_string(HRESULT_CODE(hr)) <<
", stream flags: " << stream_flag <<
", worker buffer count: (" << worker_buffer_count <<
"/" << keep_preprocessed_buf_count << ")");
// check gap in stream
if (stream_flag & MF_SOURCE_READERF_STREAMTICK ) {
GAPI_LOG_INFO(nullptr, "[" << this << "] stream gap detected");
return hr;
}
if (FAILED(hr)) {
GAPI_LOG_WARNING(nullptr, "[" << this << "] callback error "
", status: " << std::to_string(HRESULT_CODE(hr)) <<
", stream flags: " << stream_flag);
}
// put on worker buffers available ready
if (worker_buffer_count < keep_preprocessed_buf_count) {
// only one consumer might make submit
if (!submit_read_request.test_and_set()) {
hr = source_reader->ReadSample((DWORD)MF_SOURCE_READER_FIRST_VIDEO_STREAM,
0, NULL, NULL, NULL, NULL);
GAPI_LOG_DEBUG(nullptr, "[" << this << "] submit read sample, status: " <<
std::to_string(HRESULT_CODE(hr)));
}
}
return hr;
}
void MFPAsyncDemuxDataProvider::consume_worker_data() {
// wait callback exchange
std::unique_lock<std::mutex> l(buffer_storage_mutex);
buffer_storage_non_empty_cond.wait(l, [this] {
bool empty = worker_locked_buffer_storage.empty();
if (empty) {
if (!submit_read_request.test_and_set()) {
(void)source_reader->ReadSample((DWORD)MF_SOURCE_READER_FIRST_VIDEO_STREAM,
0, NULL, NULL, NULL, NULL);
}
} else {
worker_key_to_buffer_mapping_storage.swap(processing_key_to_buffer_mapping_storage);
worker_locked_buffer_storage.swap(processing_locked_buffer_storage);
}
return !empty || provider_state == State::Exhausted;
});
}
size_t MFPAsyncDemuxDataProvider::produce_worker_data(void *key,
ComPtrGuard<IMFMediaBuffer> &&buffer,
std::shared_ptr<mfx_bitstream> &&staging_stream) {
size_t bitstream_count = 0;
size_t worker_buffer_count = 0;
{
std::unique_lock<std::mutex> l(buffer_storage_mutex);
// remember sample buffer to keep data safe
worker_key_to_buffer_mapping_storage.emplace(key, std::move(buffer));
worker_buffer_count = worker_key_to_buffer_mapping_storage.size();
// remember bitstream for consuming
worker_locked_buffer_storage.push(std::move(staging_stream));
bitstream_count = worker_locked_buffer_storage.size();
buffer_storage_non_empty_cond.notify_all();
}
GAPI_DbgAssert(worker_buffer_count == bitstream_count &&
"worker_key_to_buffer_mapping_storage & worker_locked_buffer_storage"
" must be the same size" );
GAPI_LOG_DEBUG(nullptr, "[" << this << "] created dmux buffer by key: " <<
key << ", ready bitstream count: " <<
bitstream_count);
return worker_buffer_count;
}
/////////////// IDataProvider methods ///////////////
IDataProvider::mfx_codec_id_type MFPAsyncDemuxDataProvider::get_mfx_codec_id() const {
return codec;
}
bool MFPAsyncDemuxDataProvider::fetch_bitstream_data(std::shared_ptr<mfx_bitstream> &out_bitsream) {
if (empty()) {
GAPI_LOG_DEBUG(nullptr, "[" << this << "] empty");
return false;
}
// utilize consumed bitstream portion allocated at prev step
if (out_bitsream) {
// make dmux buffer unlock for not empty bitstream
GAPI_LOG_DEBUG(nullptr, "[" << this << "] " <<
"bitstream before fetch: " <<
out_bitsream.get() <<
", DataOffset: " <<
out_bitsream->DataOffset <<
", DataLength: " <<
out_bitsream->DataLength);
if (out_bitsream->DataOffset < out_bitsream->DataLength) {
return true;
}
// cleanup
auto it = processing_key_to_buffer_mapping_storage.find(out_bitsream->Data);
if (it == processing_key_to_buffer_mapping_storage.end()) {
GAPI_LOG_WARNING(nullptr, "[" << this << "] " <<
"cannot find appropriate dmux buffer by key: " <<
static_cast<void*>(out_bitsream->Data));
GAPI_Assert(false && "invalid bitstream key");
}
if (it->second) {
it->second->Unlock();
}
processing_key_to_buffer_mapping_storage.erase(it);
}
// consume new bitstream portion
if (processing_locked_buffer_storage.empty() &&
provider_state.load() == State::InProgress) {
// get worker data collected from another thread
consume_worker_data();
}
// EOF check: nothing to process at this point
if (processing_locked_buffer_storage.empty()) {
GAPI_DbgAssert(provider_state == State::Exhausted && "Source reader must be drained");
out_bitsream.reset();
return false;
}
out_bitsream = processing_locked_buffer_storage.front();
processing_locked_buffer_storage.pop();
GAPI_LOG_DEBUG(nullptr, "[" << this << "] "
"bitstream after fetch: " <<
out_bitsream.get() <<
", DataOffset: " <<
out_bitsream->DataOffset <<
", DataLength: " <<
out_bitsream->DataLength);
return true;
}
bool MFPAsyncDemuxDataProvider::empty() const {
return (provider_state.load() == State::Exhausted) &&
(processing_locked_buffer_storage.size() == 0) &&
(get_locked_buffer_size() == 0);
}
#else // _WIN32
MFPAsyncDemuxDataProvider::MFPAsyncDemuxDataProvider(const std::string&) {
GAPI_Assert(false && "Unsupported: Microsoft Media Foundation is not available");
}
IDataProvider::mfx_codec_id_type MFPAsyncDemuxDataProvider::get_mfx_codec_id() const {
GAPI_Assert(false && "Unsupported: Microsoft Media Foundation is not available");
return std::numeric_limits<mfx_codec_id_type>::max();
}
bool MFPAsyncDemuxDataProvider::fetch_bitstream_data(std::shared_ptr<mfx_bitstream> &) {
GAPI_Assert(false && "Unsupported: Microsoft Media Foundation is not available");
return false;
}
bool MFPAsyncDemuxDataProvider::empty() const override {
GAPI_Assert(false && "Unsupported: Microsoft Media Foundation is not available");
return true;
}
#endif // _WIN32
} // namespace onevpl
} // namespace wip
} // namespace gapi
} // namespace cv
#endif // HAVE_ONEVPL

@ -0,0 +1,126 @@
// This file is part of OpenCV project.
// It is subject to the license terms in the LICENSE file found in the top-level directory
// of this distribution and at http://opencv.org/license.html.
//
// Copyright (C) 2021 Intel Corporation
#ifndef GAPI_STREAMING_ONEVPL_DEMUX_ASYNC_MFP_DEMUX_DATA_PROVIDER_HPP
#define GAPI_STREAMING_ONEVPL_DEMUX_ASYNC_MFP_DEMUX_DATA_PROVIDER_HPP
#include <atomic>
#include <condition_variable>
#include <mutex>
#include <queue>
#ifdef HAVE_ONEVPL
#include <vpl/mfxvideo.h>
#ifdef _WIN32
#define NOMINMAX
#include <mfapi.h>
#include <mfidl.h>
#include <mfreadwrite.h>
#include <mfobjects.h>
#include <mftransform.h>
#include <mferror.h>
#include <shlwapi.h>
#include <wmcontainer.h>
#include <wmcodecdsp.h>
#undef NOMINMAX
#include <opencv2/gapi/streaming/onevpl/data_provider_interface.hpp>
#include "streaming/onevpl/data_provider_defines.hpp"
#include "streaming/onevpl/utils.hpp"
namespace cv {
namespace gapi {
namespace wip {
namespace onevpl {
struct GAPI_EXPORTS MFPAsyncDemuxDataProvider : public IDataProvider,
public IMFSourceReaderCallback {
MFPAsyncDemuxDataProvider(const std::string& file_path,
size_t keep_preprocessed_buf_count_value = 3);
~MFPAsyncDemuxDataProvider();
mfx_codec_id_type get_mfx_codec_id() const override;
bool fetch_bitstream_data(std::shared_ptr<mfx_bitstream> &out_bitsream) override;
bool empty() const override;
protected: /* For Unit tests only */
enum class State {
InProgress,
Exhausted
};
// IUnknown methods forbidden for current implementations
STDMETHODIMP QueryInterface(REFIID iid, void** ppv) override;
STDMETHODIMP_(ULONG) AddRef() override;
STDMETHODIMP_(ULONG) Release() override;
// IMFSourceReaderCallback methods
virtual STDMETHODIMP OnReadSample(HRESULT status, DWORD stream_index,
DWORD stream_flag, LONGLONG timestamp,
IMFSample *sample_ptr) override;
STDMETHODIMP OnEvent(DWORD, IMFMediaEvent *) override;
STDMETHODIMP OnFlush(DWORD) override;
// implementation methods
void flush();
HRESULT request_next(HRESULT hr, DWORD stream_flag,
size_t worker_buffer_count);
void consume_worker_data();
virtual size_t produce_worker_data(void *key,
ComPtrGuard<IMFMediaBuffer> &&buffer,
std::shared_ptr<mfx_bitstream> &&staging_stream);
size_t get_locked_buffer_size() const;
private:
static bool select_supported_video_stream(ComPtrGuard<IMFPresentationDescriptor> &descriptor,
mfx_codec_id_type &out_codec_id,
void *source_id);
// members
size_t keep_preprocessed_buf_count;
// COM members
ComPtrGuard<IMFMediaSource> source;
ComPtrGuard<IMFSourceReader> source_reader;
std::atomic<ULONG> com_interface_reference_count;
mfx_codec_id_type codec;
// worker & processing buffers
std::map<void*, ComPtrGuard<IMFMediaBuffer>> worker_key_to_buffer_mapping_storage;
std::map<void*, ComPtrGuard<IMFMediaBuffer>> processing_key_to_buffer_mapping_storage;
std::queue<std::shared_ptr<mfx_bitstream>> worker_locked_buffer_storage;
std::queue<std::shared_ptr<mfx_bitstream>> processing_locked_buffer_storage;
std::condition_variable buffer_storage_non_empty_cond;
mutable std::mutex buffer_storage_mutex;
std::atomic_flag submit_read_request;
std::atomic<State> provider_state;
};
} // namespace onevpl
} // namespace wip
} // namespace gapi
} // namespace cv
#else // _WIN32
namespace cv {
namespace gapi {
namespace wip {
namespace onevpl {
struct GAPI_EXPORTS MFPAsyncDemuxDataProvider : public IDataProvider {
explicit MFPAsyncDemuxDataProvider(const std::string&);
mfx_codec_id_type get_mfx_codec_id() const override;
bool fetch_bitstream_data(std::shared_ptr<mfx_bitstream> &out_bitsream) override;
bool empty() const override;
};
} // namespace onevpl
} // namespace wip
} // namespace gapi
} // namespace cv
#endif // _WIN32
#endif // HAVE_ONEVPL
#endif // GAPI_STREAMING_ONEVPL_DEMUX_ASYNC_MFP_DEMUX_DATA_PROVIDER_HPP

@ -10,6 +10,7 @@
#include <exception>
#include <opencv2/gapi/streaming/onevpl/data_provider_interface.hpp>
#include "streaming/onevpl/data_provider_defines.hpp"
#include "streaming/onevpl/engine/decode/decode_engine_legacy.hpp"
#include "streaming/onevpl/engine/decode/decode_session.hpp"
@ -122,8 +123,14 @@ VPLLegacyDecodeEngine::VPLLegacyDecodeEngine(std::unique_ptr<VPLAccelerationPoli
[this] (EngineSession& sess) -> ExecutionStatus
{
LegacyDecodeSession &my_sess = static_cast<LegacyDecodeSession&>(sess);
my_sess.last_status = ReadEncodedStream(my_sess.stream, my_sess.data_provider);
if (my_sess.last_status != MFX_ERR_NONE) {
if (!my_sess.data_provider) {
my_sess.last_status = MFX_ERR_MORE_DATA;
return ExecutionStatus::Continue;
}
my_sess.last_status = MFX_ERR_NONE;
if (!my_sess.data_provider->fetch_bitstream_data(my_sess.stream)) {
my_sess.last_status = MFX_ERR_MORE_DATA;
my_sess.data_provider.reset(); //close source
}
return ExecutionStatus::Continue;
@ -140,7 +147,7 @@ VPLLegacyDecodeEngine::VPLLegacyDecodeEngine(std::unique_ptr<VPLAccelerationPoli
my_sess.last_status =
MFXVideoDECODE_DecodeFrameAsync(my_sess.session,
my_sess.last_status == MFX_ERR_NONE
? &my_sess.stream
? my_sess.stream.get()
: nullptr, /* No more data to read, start decode draining mode*/
my_sess.procesing_surface_ptr.lock()->get_handle(),
&sync_pair.second,
@ -339,8 +346,10 @@ ProcessingEngineBase::ExecutionStatus VPLLegacyDecodeEngine::process_error(mfxSt
// The decoder detected a new sequence header in the bitstream.
// Video parameters may have changed.
// In external memory allocation case, might need to reallocate the output surface
GAPI_DbgAssert(false && "VPLLegacyDecodeEngine::process_error - "
/*GAPI_DbgAssert(false && "VPLLegacyDecodeEngine::process_error - "
"MFX_WRN_VIDEO_PARAM_CHANGED is not processed");
*/
return ExecutionStatus::Continue;
break;
case MFX_ERR_INCOMPATIBLE_VIDEO_PARAM:
// The function detected that video parameters provided by the application

@ -14,7 +14,7 @@ namespace gapi {
namespace wip {
namespace onevpl {
EngineSession::EngineSession(mfxSession sess, mfxBitstream&& str) :
EngineSession::EngineSession(mfxSession sess, std::shared_ptr<IDataProvider::mfx_bitstream>&& str) :
session(sess), stream(std::move(str)) {}
EngineSession::~EngineSession()
{

@ -15,6 +15,7 @@
#include <vector>
#include "opencv2/gapi/own/exports.hpp" // GAPI_EXPORTS
#include <opencv2/gapi/streaming/onevpl/data_provider_interface.hpp>
#ifdef HAVE_ONEVPL
#include <vpl/mfxvideo.h>
@ -26,17 +27,17 @@ namespace onevpl {
// GAPI_EXPORTS for tests
struct GAPI_EXPORTS DecoderParams {
mfxBitstream stream;
std::shared_ptr<IDataProvider::mfx_bitstream> stream;
mfxVideoParam param;
};
struct GAPI_EXPORTS EngineSession {
mfxSession session;
mfxBitstream stream;
std::shared_ptr<IDataProvider::mfx_bitstream> stream;
mfxSyncPoint sync;
mfxStatus last_status;
EngineSession(mfxSession sess, mfxBitstream&& str);
EngineSession(mfxSession sess, std::shared_ptr<IDataProvider::mfx_bitstream>&& str);
std::string error_code_to_str() const;
virtual ~EngineSession();
};

@ -99,34 +99,6 @@ const VPLAccelerationPolicy* ProcessingEngineBase::get_accel() const {
VPLAccelerationPolicy* ProcessingEngineBase::get_accel() {
return const_cast<VPLAccelerationPolicy*>(static_cast<const ProcessingEngineBase*>(this)->get_accel());
}
// Read encoded stream from file
mfxStatus ReadEncodedStream(mfxBitstream &bs, std::shared_ptr<IDataProvider>& data_provider) {
if (!data_provider) {
return MFX_ERR_MORE_DATA;
}
mfxU8 *p0 = bs.Data;
mfxU8 *p1 = bs.Data + bs.DataOffset;
if (bs.DataOffset > bs.MaxLength - 1) {
return MFX_ERR_NOT_ENOUGH_BUFFER;
}
if (bs.DataLength + bs.DataOffset > bs.MaxLength) {
return MFX_ERR_NOT_ENOUGH_BUFFER;
}
std::copy_n(p1, bs.DataLength, p0);
bs.DataOffset = 0;
bs.DataLength += static_cast<mfxU32>(data_provider->fetch_data(bs.MaxLength - bs.DataLength,
bs.Data + bs.DataLength));
if (bs.DataLength == 0)
return MFX_ERR_MORE_DATA;
return MFX_ERR_NONE;
}
} // namespace onevpl
} // namespace wip
} // namespace gapi

@ -87,9 +87,6 @@ protected:
return sess_impl;
}
};
mfxStatus ReadEncodedStream(mfxBitstream &bs, std::shared_ptr<IDataProvider>& data_provider);
} // namespace onevpl
} // namespace wip
} // namespace gapi

@ -3,44 +3,152 @@
// of this distribution and at http://opencv.org/license.html.
//
// Copyright (C) 2021 Intel Corporation
#include <errno.h>
#include "streaming/onevpl/file_data_provider.hpp"
#include "streaming/onevpl/cfg_params_parser.hpp"
#include "streaming/onevpl/utils.hpp"
#include "logger.hpp"
namespace cv {
namespace gapi {
namespace wip {
namespace onevpl {
FileDataProvider::FileDataProvider(const std::string& file_path) :
source_handle(fopen(file_path.c_str(), "rb"), &fclose) {
#ifdef HAVE_ONEVPL
FileDataProvider::FileDataProvider(const std::string& file_path,
const std::vector<CfgParam> &codec_params,
uint32_t bitstream_data_size_value) :
source_handle(nullptr, &fclose),
bitstream_data_size(bitstream_data_size_value) {
GAPI_LOG_DEBUG(nullptr, "[" << this << "] " <<
"check codec Id from CfgParam, total param count: " <<
codec_params.size());
auto codec_it =
std::find_if(codec_params.begin(), codec_params.end(), [] (const CfgParam& value) {
return value.get_name() == "mfxImplDescription.mfxDecoderDescription.decoder.CodecID";
});
if (codec_it == codec_params.end())
{
GAPI_LOG_WARNING(nullptr, "[" << this << "] " <<
"\"mfxImplDescription.mfxDecoderDescription.decoder.CodecID\" "
"is absent, total param count" << codec_params.size());
throw DataProviderUnsupportedException("\"mfxImplDescription.mfxDecoderDescription.decoder.CodecID\" "
"is required for FileDataProvider");
}
codec = cfg_param_to_mfx_variant(*codec_it).Data.U32;
GAPI_LOG_DEBUG(nullptr, "[" << this << "] " <<
"opening file: " << file_path);
source_handle.reset(fopen(file_path.c_str(), "rb"));
if (!source_handle) {
throw DataProviderSystemErrorException(errno,
"FileDataProvider: cannot open source file: " + file_path);
}
GAPI_LOG_INFO(nullptr, "[" << this << "] " <<
"file: " << file_path << " opened, codec requested: " << mfx_codec_id_to_cstr(codec));
}
FileDataProvider::~FileDataProvider() = default;
size_t FileDataProvider::fetch_data(size_t out_data_bytes_size, void* out_data) {
IDataProvider::mfx_codec_id_type FileDataProvider::get_mfx_codec_id() const {
return codec;
}
bool FileDataProvider::fetch_bitstream_data(std::shared_ptr<mfx_bitstream> &out_bitstream) {
GAPI_LOG_DEBUG(nullptr, "[" << this << "] " <<
", dst: " << out_bitstream.get());
if (empty()) {
return 0;
return false;
}
if (!out_bitstream) {
out_bitstream = std::make_shared<mfx_bitstream>();
out_bitstream->MaxLength = bitstream_data_size;
out_bitstream->Data = (mfxU8 *)calloc(out_bitstream->MaxLength, sizeof(mfxU8));
if(!out_bitstream->Data) {
throw std::runtime_error("Cannot allocate bitstream.Data bytes: " +
std::to_string(out_bitstream->MaxLength * sizeof(mfxU8)));
}
out_bitstream->CodecId = get_mfx_codec_id();
}
GAPI_LOG_DEBUG(nullptr, "[" << this << "] " <<
"bitstream before fetch, DataOffset: " <<
out_bitstream->DataOffset <<
", DataLength: " <<
out_bitstream->DataLength);
mfxU8 *p0 = out_bitstream->Data;
mfxU8 *p1 = out_bitstream->Data + out_bitstream->DataOffset;
if (out_bitstream->DataOffset > out_bitstream->MaxLength - 1) {
throw DataProviderImplementationException(mfxstatus_to_string(MFX_ERR_NOT_ENOUGH_BUFFER));
}
if (out_bitstream->DataLength + out_bitstream->DataOffset > out_bitstream->MaxLength) {
throw DataProviderImplementationException(mfxstatus_to_string(MFX_ERR_NOT_ENOUGH_BUFFER));
}
size_t ret = fread(out_data, 1, out_data_bytes_size, source_handle.get());
if (ret == 0) {
std::copy_n(p1, out_bitstream->DataLength, p0);
out_bitstream->DataOffset = 0;
size_t bytes_count = fread(out_bitstream->Data + out_bitstream->DataLength,
1, out_bitstream->MaxLength - out_bitstream->DataLength,
source_handle.get());
if (bytes_count == 0) {
if (feof(source_handle.get())) {
source_handle.reset();
} else {
throw DataProviderSystemErrorException (errno, "FileDataProvider::fetch_data error read");
throw DataProviderSystemErrorException (errno, "FileDataProvider::fetch_bitstream_data error read");
}
}
return ret;
out_bitstream->DataLength += static_cast<mfxU32>(bytes_count);
GAPI_LOG_DEBUG(nullptr, "bitstream after fetch, DataOffset: " << out_bitstream->DataOffset <<
", DataLength: " << out_bitstream->DataLength);
if (out_bitstream->DataLength == 0)
return false;
GAPI_LOG_DEBUG(nullptr, "[" << this << "] " <<
"buff fetched: " << out_bitstream.get());
return true;
}
bool FileDataProvider::empty() const {
return !source_handle;
}
#else // HAVE_ONEVPL
FileDataProvider::FileDataProvider(const std::string&,
const std::vector<CfgParam> &,
uint32_t bitstream_data_size_value) :
source_handle(nullptr, &fclose),
codec(std::numeric_limits<mfx_codec_id_type>::max()),
bitstream_data_size(bitstream_data_size_value) {
GAPI_Assert(false && "Unsupported: G-API compiled without `WITH_GAPI_ONEVPL=ON`");
}
FileDataProvider::~FileDataProvider() = default;
IDataProvider::mfx_codec_id_type FileDataProvider::get_mfx_codec_id() const {
cv::util::suppress_unused_warning(codec);
GAPI_Assert(false && "Unsupported: G-API compiled without `WITH_GAPI_ONEVPL=ON`");
return codec;
}
bool FileDataProvider::fetch_bitstream_data(std::shared_ptr<mfx_bitstream> &) {
cv::util::suppress_unused_warning(bitstream_data_size);
GAPI_Assert(false && "Unsupported: G-API compiled without `WITH_GAPI_ONEVPL=ON`");
return false;
}
bool FileDataProvider::empty() const {
GAPI_Assert(false && "Unsupported: G-API compiled without `WITH_GAPI_ONEVPL=ON`");
return true;
}
#endif // HAVE_ONEVPL
} // namespace onevpl
} // namespace wip
} // namespace gapi

@ -6,9 +6,13 @@
#ifndef GAPI_STREAMING_ONEVPL_ONEVPL_FILE_DATA_PROVIDER_HPP
#define GAPI_STREAMING_ONEVPL_ONEVPL_FILE_DATA_PROVIDER_HPP
#include <stdio.h>
#include <opencv2/gapi/streaming/onevpl/data_provider_interface.hpp>
#include <opencv2/gapi/streaming/onevpl/cfg_params.hpp>
#include "streaming/onevpl/data_provider_defines.hpp"
namespace cv {
namespace gapi {
@ -17,13 +21,18 @@ namespace onevpl {
struct FileDataProvider : public IDataProvider {
using file_ptr = std::unique_ptr<FILE, decltype(&fclose)>;
FileDataProvider(const std::string& file_path);
FileDataProvider(const std::string& file_path,
const std::vector<CfgParam> &codec_params = {},
uint32_t bitstream_data_size_value = 2000000);
~FileDataProvider();
size_t fetch_data(size_t out_data_bytes_size, void* out_data) override;
mfx_codec_id_type get_mfx_codec_id() const override;
bool fetch_bitstream_data(std::shared_ptr<mfx_bitstream> &out_bitsream) override;
bool empty() const override;
private:
file_ptr source_handle;
mfx_codec_id_type codec;
const uint32_t bitstream_data_size;
};
} // namespace onevpl
} // namespace wip

@ -7,7 +7,7 @@
#include <opencv2/gapi/streaming/onevpl/source.hpp>
#include "streaming/onevpl/source_priv.hpp"
#include "streaming/onevpl/file_data_provider.hpp"
#include "streaming/onevpl/data_provider_dispatcher.hpp"
#include "streaming/onevpl/cfg_param_device_selector.hpp"
namespace cv {
@ -36,7 +36,7 @@ GSource::GSource(const std::string& filePath,
GSource::GSource(const std::string& filePath,
const CfgParams& cfg_params,
std::shared_ptr<IDeviceSelector> selector) :
GSource(std::make_shared<FileDataProvider>(filePath), cfg_params, selector) {
GSource(DataProviderDispatcher::create(filePath, cfg_params), cfg_params, selector) {
if (filePath.empty()) {
util::throw_error(std::logic_error("Cannot create 'GSource' on empty source file name"));
}

@ -12,6 +12,7 @@
#include "streaming/onevpl/accelerators/accel_policy_cpu.hpp"
#include "streaming/onevpl/utils.hpp"
#include "streaming/onevpl/cfg_params_parser.hpp"
#include "streaming/onevpl/data_provider_defines.hpp"
#include "streaming/onevpl/source_priv.hpp"
#include "logger.hpp"
@ -44,7 +45,6 @@ enum {
VPL_NEW_API_MINOR_VERSION = 2
};
GSource::Priv::Priv() :
mfx_handle(MFXLoad()),
mfx_impl_description(),
@ -147,7 +147,7 @@ GSource::Priv::Priv(std::shared_ptr<IDataProvider> provider,
// An available VPL implementation with max matching count
std::vector<CfgParam> impl_params = get_params_from_string<CfgParam>(ss.str());
std::sort(impl_params.begin(), impl_params.end());
GAPI_LOG_DEBUG(nullptr, "Find implementation cfg params count" << impl_params.size());
GAPI_LOG_DEBUG(nullptr, "Find implementation cfg params count: " << impl_params.size());
std::vector<CfgParam> matched_params;
std::set_intersection(impl_params.begin(), impl_params.end(),
@ -195,10 +195,7 @@ GSource::Priv::Priv(std::shared_ptr<IDataProvider> provider,
// initialize decoder
// Find codec ID from config
auto dec_it = std::find_if(cfg_params.begin(), cfg_params.end(), [] (const CfgParam& value) {
return value.get_name() == "mfxImplDescription.mfxDecoderDescription.decoder.CodecID";
});
GAPI_Assert (dec_it != cfg_params.end() && "Cannot determine DecoderID from oneVPL config. Abort");
IDataProvider::mfx_codec_id_type decoder_id = provider->get_mfx_codec_id();
// create session driving engine if required
if (!engine) {
@ -215,7 +212,7 @@ GSource::Priv::Priv(std::shared_ptr<IDataProvider> provider,
}
//create decoder for session accoring to header recovered from source file
DecoderParams decoder_param = create_decoder_from_file(*dec_it, provider);
DecoderParams decoder_param = create_decoder_from_file(decoder_id, provider);
// create engine session for processing mfx session pipeline
engine->initialize_session(mfx_session, std::move(decoder_param),
@ -233,41 +230,42 @@ GSource::Priv::~Priv()
MFXUnload(mfx_handle);
}
DecoderParams GSource::Priv::create_decoder_from_file(const CfgParam& decoder_cfg,
DecoderParams GSource::Priv::create_decoder_from_file(uint32_t decoder_id,
std::shared_ptr<IDataProvider> provider)
{
GAPI_DbgAssert(provider && "Cannot create decoder, data provider is nullptr");
mfxBitstream bitstream{};
const int BITSTREAM_BUFFER_SIZE = 2000000;
bitstream.MaxLength = BITSTREAM_BUFFER_SIZE;
bitstream.Data = (mfxU8 *)calloc(bitstream.MaxLength, sizeof(mfxU8));
if(!bitstream.Data) {
throw std::runtime_error("Cannot allocate bitstream.Data bytes: " +
std::to_string(bitstream.MaxLength * sizeof(mfxU8)));
}
mfxVariant decoder = cfg_param_to_mfx_variant(decoder_cfg);
// according to oneVPL documentation references
// https://spec.oneapi.io/versions/latest/elements/oneVPL/source/API_ref/VPL_disp_api_struct.html
// mfxVariant is an `union` type and considered different meaning for different param ids
// So CodecId has U32 data type
bitstream.CodecId = decoder.Data.U32;
mfxStatus sts = ReadEncodedStream(bitstream, provider);
if(MFX_ERR_NONE != sts) {
throw std::runtime_error("Error reading bitstream, error: " +
mfxstatus_to_string(sts));
}
std::shared_ptr<IDataProvider::mfx_bitstream> bitstream{};
// Retrieve the frame information from input stream
mfxVideoParam mfxDecParams {};
mfxDecParams.mfx.CodecId = decoder.Data.U32;
mfxDecParams.mfx.CodecId = decoder_id;
mfxDecParams.IOPattern = MFX_IOPATTERN_OUT_SYSTEM_MEMORY;//MFX_IOPATTERN_OUT_VIDEO_MEMORY;
sts = MFXVideoDECODE_DecodeHeader(mfx_session, &bitstream, &mfxDecParams);
if(MFX_ERR_NONE != sts) {
throw std::runtime_error("Error decoding header, error: " +
mfxstatus_to_string(sts));
mfxStatus sts = MFX_ERR_NONE;
bool can_fetch_data = false;
do {
can_fetch_data = provider->fetch_bitstream_data(bitstream);
if (!can_fetch_data) {
// must fetch data always because EOF critical at this point
GAPI_LOG_WARNING(nullptr, "cannot decode header from provider: " << provider.get() <<
". Unexpected EOF");
throw std::runtime_error("Error reading bitstream: EOF");
}
sts = MFXVideoDECODE_DecodeHeader(mfx_session, bitstream.get(), &mfxDecParams);
if(MFX_ERR_NONE != sts && MFX_ERR_MORE_DATA != sts) {
throw std::runtime_error("Error decoding header, error: " +
mfxstatus_to_string(sts));
}
} while (sts == MFX_ERR_MORE_DATA && !provider->empty());
if (MFX_ERR_NONE != sts) {
GAPI_LOG_WARNING(nullptr, "cannot decode header from provider: " << provider.get()
<< ". Make sure data source is valid and/or "
"\"mfxImplDescription.mfxDecoderDescription.decoder.CodecID\""
" has correct value in case of demultiplexed raw input");
throw std::runtime_error("Error decode header, error: " +
mfxstatus_to_string(sts));
}
// Input parameters finished, now initialize decode

@ -49,7 +49,7 @@ struct GSource::Priv
GMetaArg descr_of() const;
private:
Priv();
DecoderParams create_decoder_from_file(const CfgParam& decoder,
DecoderParams create_decoder_from_file(uint32_t decoder_id,
std::shared_ptr<IDataProvider> provider);
std::unique_ptr<VPLAccelerationPolicy> initializeHWAccel();

@ -152,6 +152,39 @@ mfxU32 cstr_to_mfx_codec_id(const char* cstr) {
throw std::logic_error(std::string("Cannot parse \"mfxImplDescription.mfxDecoderDescription.decoder.CodecID\" value: ") + cstr);
}
const char* mfx_codec_id_to_cstr(mfxU32 mfx_id) {
switch(mfx_id) {
case MFX_CODEC_AVC:
return "MFX_CODEC_AVC";
case MFX_CODEC_HEVC:
return "MFX_CODEC_HEVC";
case MFX_CODEC_MPEG2:
return "MFX_CODEC_MPEG2";
case MFX_CODEC_VC1:
return "MFX_CODEC_VC1";
case MFX_CODEC_VP9:
return "MFX_CODEC_VP9";
case MFX_CODEC_AV1:
return "MFX_CODEC_AV1";
case MFX_CODEC_JPEG:
return "MFX_CODEC_JPEG";
default:
return "<unsupported>";
}
}
const std::set<mfxU32>& get_supported_mfx_codec_ids()
{
static std::set<mfxU32> supported_codecs({MFX_CODEC_AVC,
MFX_CODEC_HEVC,
MFX_CODEC_MPEG2,
MFX_CODEC_VC1,
MFX_CODEC_VP9,
MFX_CODEC_AV1,
MFX_CODEC_JPEG});
return supported_codecs;
}
const char* mfx_codec_type_to_cstr(const mfxU32 fourcc, const mfxU32 type) {
switch (fourcc) {
case MFX_CODEC_JPEG: {
@ -384,6 +417,10 @@ std::ostream& operator<< (std::ostream& out, const mfxImplDescription& idesc)
return out;
}
std::string mfxstatus_to_string(int64_t err) {
return mfxstatus_to_string(static_cast<mfxStatus>(err));
}
std::string mfxstatus_to_string(mfxStatus err) {
switch(err)
{

@ -17,6 +17,7 @@
#include <map>
#include <memory>
#include <set>
#include <string>
#include <opencv2/gapi/streaming/onevpl/cfg_params.hpp>
@ -61,10 +62,15 @@ mfxResourceType cstr_to_mfx_resource_type(const char* cstr);
mfxU32 cstr_to_mfx_codec_id(const char* cstr);
const char* mfx_codec_id_to_cstr(mfxU32 mfx_id);
const std::set<mfxU32> &get_supported_mfx_codec_ids();
const char* mfx_codec_type_to_cstr(const mfxU32 fourcc, const mfxU32 type);
mfxU32 cstr_to_mfx_version(const char* cstr);
std::string mfxstatus_to_string(int64_t err);
std::string mfxstatus_to_string(mfxStatus err);
std::ostream& operator<< (std::ostream& out, const mfxImplDescription& idesc);

@ -26,6 +26,7 @@
#include <opencv2/gapi/streaming/format.hpp>
#include <opencv2/gapi/streaming/onevpl/source.hpp>
#include "streaming/onevpl/data_provider_defines.hpp"
#ifdef HAVE_ONEVPL
@ -268,13 +269,47 @@ void checkPullOverload(const cv::Mat& ref,
EXPECT_EQ(0., cv::norm(ref, out_mat, cv::NORM_INF));
}
#ifdef HAVE_ONEVPL
struct StreamDataProvider : public cv::gapi::wip::onevpl::IDataProvider {
StreamDataProvider(std::istream& in) : data_stream (in) {
EXPECT_TRUE(in);
}
size_t fetch_data(size_t out_data_size, void* out_data_buf) override {
mfx_codec_id_type get_mfx_codec_id() const override {
return MFX_CODEC_HEVC;
}
bool fetch_bitstream_data(std::shared_ptr<mfx_bitstream> &out_bitstream) override {
if (empty()) {
return false;
}
if (!out_bitstream) {
out_bitstream = std::make_shared<mfx_bitstream>();
out_bitstream->MaxLength = 2000000;
out_bitstream->Data = (mfxU8 *)calloc(out_bitstream->MaxLength, sizeof(mfxU8));
if(!out_bitstream->Data) {
throw std::runtime_error("Cannot allocate bitstream.Data bytes: " +
std::to_string(out_bitstream->MaxLength * sizeof(mfxU8)));
}
out_bitstream->CodecId = get_mfx_codec_id();
}
mfxU8 *p0 = out_bitstream->Data;
mfxU8 *p1 = out_bitstream->Data + out_bitstream->DataOffset;
EXPECT_FALSE(out_bitstream->DataOffset > out_bitstream->MaxLength - 1);
EXPECT_FALSE(out_bitstream->DataLength + out_bitstream->DataOffset > out_bitstream->MaxLength);
std::copy_n(p1, out_bitstream->DataLength, p0);
out_bitstream->DataOffset = 0;
out_bitstream->DataLength += static_cast<mfxU32>(fetch_data(out_bitstream->MaxLength - out_bitstream->DataLength,
out_bitstream->Data + out_bitstream->DataLength));
return out_bitstream->DataLength != 0;
}
size_t fetch_data(size_t out_data_size, void* out_data_buf) {
data_stream.read(reinterpret_cast<char*>(out_data_buf), out_data_size);
return (size_t)data_stream.gcount();
}
@ -284,6 +319,7 @@ struct StreamDataProvider : public cv::gapi::wip::onevpl::IDataProvider {
private:
std::istream& data_stream;
};
#endif // HAVE_ONEVPL
} // anonymous namespace
TEST_P(GAPI_Streaming, SmokeTest_ConstInput_GMat)
@ -2232,7 +2268,7 @@ TEST(OneVPL_Source, Init)
std::stringstream stream(std::ios_base::in | std::ios_base::out | std::ios_base::binary);
EXPECT_TRUE(stream.write(reinterpret_cast<char*>(const_cast<unsigned char *>(hevc_header)),
sizeof(hevc_header)));
std::shared_ptr<cv::gapi::wip::onevpl::IDataProvider> stream_data_provider = std::make_shared<StreamDataProvider>(stream);
auto stream_data_provider = std::make_shared<StreamDataProvider>(stream);
cv::Ptr<cv::gapi::wip::IStreamSource> cap;
bool cap_created = false;

@ -45,12 +45,15 @@ namespace
struct EmptyDataProvider : public cv::gapi::wip::onevpl::IDataProvider {
size_t fetch_data(size_t, void*) override {
return 0;
}
bool empty() const override {
return true;
}
mfx_codec_id_type get_mfx_codec_id() const override {
return std::numeric_limits<uint32_t>::max();
}
bool fetch_bitstream_data(std::shared_ptr<mfx_bitstream> &) override {
return false;
}
};
struct TestProcessingSession : public cv::gapi::wip::onevpl::EngineSession {

@ -0,0 +1,304 @@
// This file is part of OpenCV project.
// It is subject to the license terms in the LICENSE file found in the top-level directory
// of this distribution and at http://opencv.org/license.html.
//
// Copyright (C) 2021 Intel Corporation
#ifdef HAVE_ONEVPL
#include <future>
#include "../test_precomp.hpp"
#include "../common/gapi_tests_common.hpp"
#include "streaming/onevpl/data_provider_dispatcher.hpp"
#include "streaming/onevpl/file_data_provider.hpp"
#include "streaming/onevpl/demux/async_mfp_demux_data_provider.hpp"
#include "streaming/onevpl/source_priv.hpp"
namespace opencv_test
{
namespace
{
using source_t = std::string;
using dd_valid_t = bool;
using demux_valid_t = bool;
using dec_valid_t = bool;
using array_element_t =
std::tuple<source_t, dd_valid_t, demux_valid_t, dec_valid_t>;
array_element_t files[] = {
array_element_t {"highgui/video/VID00003-20100701-2204.3GP",
false, true, false},
array_element_t {"highgui/video/VID00003-20100701-2204.avi",
false, true, false},
array_element_t {"highgui/video/VID00003-20100701-2204.mpg",
false, true, false},
array_element_t {"highgui/video/VID00003-20100701-2204.wmv",
false, true, false},
array_element_t {"highgui/video/sample_322x242_15frames.yuv420p.libaom-av1.mp4",
true, true, true},
array_element_t {"highgui/video/sample_322x242_15frames.yuv420p.libvpx-vp9.mp4",
true, true, true},
array_element_t {"highgui/video/sample_322x242_15frames.yuv420p.libx264.avi",
true, true, true},
array_element_t {"highgui/video/sample_322x242_15frames.yuv420p.libx264.mp4",
true, true, true},
array_element_t {"highgui/video/sample_322x242_15frames.yuv420p.libx265.mp4",
true, true, true},
array_element_t {"highgui/video/sample_322x242_15frames.yuv420p.mjpeg.mp4",
/* MFP cannot extract video MJPEG subtype from that */
false, false, true},
array_element_t {"highgui/video/big_buck_bunny.h264",
false, false, false},
array_element_t {"highgui/video/big_buck_bunny.h265",
false, false, false}
};
class OneVPL_Source_MFPAsyncDispatcherTest : public ::testing::TestWithParam<array_element_t> {};
TEST_P(OneVPL_Source_MFPAsyncDispatcherTest, open_and_decode_file)
{
using namespace cv::gapi::wip::onevpl;
source_t path = findDataFile(std::get<0>(GetParam()));
dd_valid_t dd_result = std::get<1>(GetParam());
dec_valid_t dec_result = std::get<3>(GetParam());
// open demux source & check format support
std::unique_ptr<MFPAsyncDemuxDataProvider> provider_ptr;
try {
provider_ptr.reset(new MFPAsyncDemuxDataProvider(path));
} catch (...) {
EXPECT_FALSE(dd_result);
GTEST_SUCCEED();
return;
}
EXPECT_TRUE(dd_result);
// initialize MFX
mfxLoader mfx_handle = MFXLoad();
mfxConfig cfg_inst_0 = MFXCreateConfig(mfx_handle);
EXPECT_TRUE(cfg_inst_0);
mfxVariant mfx_param_0;
mfx_param_0.Type = MFX_VARIANT_TYPE_U32;
mfx_param_0.Data.U32 = provider_ptr->get_mfx_codec_id();
EXPECT_EQ(MFXSetConfigFilterProperty(cfg_inst_0,(mfxU8 *)"mfxImplDescription.mfxDecoderDescription.decoder.CodecID",
mfx_param_0), MFX_ERR_NONE);
// create MFX session
mfxSession mfx_session{};
mfxStatus sts = MFXCreateSession(mfx_handle, 0, &mfx_session);
EXPECT_EQ(MFX_ERR_NONE, sts);
// create proper bitstream
std::shared_ptr<IDataProvider::mfx_bitstream> bitstream{};
// prepare dec params
mfxVideoParam mfxDecParams {};
mfxDecParams.mfx.CodecId = mfx_param_0.Data.U32;
mfxDecParams.IOPattern = MFX_IOPATTERN_OUT_SYSTEM_MEMORY;
do {
bool fetched = provider_ptr->fetch_bitstream_data(bitstream);
if (dec_result) {
EXPECT_TRUE(fetched);
}
sts = MFXVideoDECODE_DecodeHeader(mfx_session, bitstream.get(), &mfxDecParams);
EXPECT_TRUE(MFX_ERR_NONE == sts || MFX_ERR_MORE_DATA == sts);
} while (sts == MFX_ERR_MORE_DATA && !provider_ptr->empty());
if (dec_result) {
EXPECT_EQ(MFX_ERR_NONE, sts);
} else {
EXPECT_FALSE(MFX_ERR_NONE == sts);
}
MFXVideoDECODE_Close(mfx_session);
MFXClose(mfx_session);
MFXUnload(mfx_handle);
}
TEST_P(OneVPL_Source_MFPAsyncDispatcherTest, choose_dmux_provider)
{
using namespace cv::gapi::wip::onevpl;
source_t path = findDataFile(std::get<0>(GetParam()));
dd_valid_t dd_result = std::get<1>(GetParam());
std::shared_ptr<IDataProvider> provider_ptr;
// choose demux provider for empty CfgParams
try {
provider_ptr = DataProviderDispatcher::create(path);
} catch (...) {
EXPECT_FALSE(dd_result);
provider_ptr = DataProviderDispatcher::create(path,
{ CfgParam::create<std::string>(
"mfxImplDescription.mfxDecoderDescription.decoder.CodecID",
"MFX_CODEC_HEVC") /* Doesn't matter what codec for RAW here*/});
EXPECT_TRUE(std::dynamic_pointer_cast<FileDataProvider>(provider_ptr));
GTEST_SUCCEED();
return;
}
EXPECT_TRUE(dd_result);
EXPECT_TRUE(std::dynamic_pointer_cast<MFPAsyncDemuxDataProvider>(provider_ptr));
}
INSTANTIATE_TEST_CASE_P(MFP_VPL_DecodeHeaderTests, OneVPL_Source_MFPAsyncDispatcherTest,
testing::ValuesIn(files));
namespace test {
struct IntrusiveAsyncDemuxDataProvider :
public cv::gapi::wip::onevpl::MFPAsyncDemuxDataProvider {
using base_t = cv::gapi::wip::onevpl::MFPAsyncDemuxDataProvider;
using base_t::base_t;
~IntrusiveAsyncDemuxDataProvider() {
destroyed = true;
}
STDMETHODIMP OnReadSample(HRESULT status, DWORD stream_index,
DWORD stream_flag, LONGLONG timestamp,
IMFSample *sample_ptr) override {
if (IntrusiveAsyncDemuxDataProvider::need_request_next) {
return base_t::OnReadSample(status, stream_index, stream_flag,
timestamp, sample_ptr);
}
return status;
}
// implementation methods
size_t produce_worker_data(void *key,
cv::gapi::wip::onevpl::ComPtrGuard<IMFMediaBuffer> &&buffer,
std::shared_ptr<mfx_bitstream> &&staging_stream) override {
return base_t::produce_worker_data(key, std::move(buffer),
std::move(staging_stream));
}
static bool need_request_next;
static bool destroyed;
};
bool IntrusiveAsyncDemuxDataProvider::need_request_next{};
bool IntrusiveAsyncDemuxDataProvider::destroyed{};
} // namespace test
TEST(OneVPL_Source_MFPAsyncDemux, sync_flush) {
using namespace cv::gapi::wip::onevpl;
source_t path = findDataFile("highgui/video/sample_322x242_15frames.yuv420p.libx265.mp4");
test::IntrusiveAsyncDemuxDataProvider::need_request_next = false;
const size_t preprocessed_samples_count = 3;
{
test::IntrusiveAsyncDemuxDataProvider provider(path, preprocessed_samples_count);
size_t produce_buffer_count = 199 * preprocessed_samples_count;
std::thread producer([&provider, produce_buffer_count]() {
size_t total_produced_count = 0;
for (size_t i = 0; i < produce_buffer_count; i ++) {
total_produced_count += provider.produce_worker_data(
reinterpret_cast<void*>(i),
createCOMPtrGuard<IMFMediaBuffer>(nullptr),
{});
}
});
producer.join();
}
EXPECT_EQ(test::IntrusiveAsyncDemuxDataProvider::destroyed, true);
}
TEST(OneVPL_Source_MFPAsyncDemux, async_flush) {
using namespace cv::gapi::wip::onevpl;
source_t path = findDataFile("highgui/video/sample_322x242_15frames.yuv420p.libx265.mp4");
test::IntrusiveAsyncDemuxDataProvider::need_request_next = true;
const size_t preprocessed_samples_count = 999;
{
std::shared_ptr<IDataProvider::mfx_bitstream> stream;
test::IntrusiveAsyncDemuxDataProvider provider(path, preprocessed_samples_count);
EXPECT_TRUE(provider.fetch_bitstream_data(stream));
EXPECT_TRUE(stream);
}
EXPECT_EQ(test::IntrusiveAsyncDemuxDataProvider::destroyed, true);
}
TEST(OneVPL_Source_MFPAsyncDemux, eof_async_detection) {
using namespace cv::gapi::wip::onevpl;
source_t path = findDataFile("highgui/video/sample_322x242_15frames.yuv420p.libx265.mp4");
test::IntrusiveAsyncDemuxDataProvider::need_request_next = false;
const size_t preprocessed_samples_count = 0; // do not ask sample at start
test::IntrusiveAsyncDemuxDataProvider provider(path, preprocessed_samples_count);
std::promise<void> start_consume_data;
std::future<void> wait_consume_data = start_consume_data.get_future();
std::thread fetcher([&provider, &start_consume_data]() {
std::shared_ptr<IDataProvider::mfx_bitstream> stream;
start_consume_data.set_value();
EXPECT_FALSE(provider.fetch_bitstream_data(stream));
EXPECT_FALSE(stream);
});
wait_consume_data.wait();
std::this_thread::sleep_for(std::chrono::seconds(2)); // hope fetched has slept on condition
test::IntrusiveAsyncDemuxDataProvider::need_request_next = true;
provider.OnReadSample(S_OK, 0, MF_SOURCE_READERF_ENDOFSTREAM, 0, nullptr);
fetcher.join();
}
TEST(OneVPL_Source_MFPAsyncDemux, produce_consume) {
using namespace cv::gapi::wip::onevpl;
source_t path = findDataFile("highgui/video/sample_322x242_15frames.yuv420p.libx265.mp4");
test::IntrusiveAsyncDemuxDataProvider::need_request_next = false;
const size_t preprocessed_samples_count = 3;
test::IntrusiveAsyncDemuxDataProvider provider(path, preprocessed_samples_count);
std::promise<void> start_consume_data;
std::future<void> wait_consume_data = start_consume_data.get_future();
size_t produce_buffer_count = 199 * preprocessed_samples_count;
std::thread producer([&provider, &wait_consume_data, produce_buffer_count]() {
wait_consume_data.wait();
size_t total_produced_count = 0;
for (size_t i = 0; i < produce_buffer_count; i ++) {
std::shared_ptr<IDataProvider::mfx_bitstream> dummy_stream =
std::make_shared<IDataProvider::mfx_bitstream>();
dummy_stream->DataLength = static_cast<mfxU32>(i); // control block
dummy_stream->DataOffset = static_cast<mfxU32>(i); // control block
dummy_stream->Data = reinterpret_cast<mfxU8*>(i);
total_produced_count = provider.produce_worker_data(
dummy_stream->Data,
createCOMPtrGuard<IMFMediaBuffer>(nullptr),
std::move(dummy_stream));
EXPECT_TRUE(total_produced_count <= produce_buffer_count);
}
});
std::thread consumer([&provider, &start_consume_data, produce_buffer_count]() {
start_consume_data.set_value();
size_t total_consumed_count = 0;
std::shared_ptr<IDataProvider::mfx_bitstream> dummy_stream;
size_t stream_idx = 0;
do {
EXPECT_TRUE(provider.fetch_bitstream_data(dummy_stream));
EXPECT_TRUE(dummy_stream);
EXPECT_EQ(dummy_stream->DataLength, stream_idx);
stream_idx ++;
total_consumed_count++;
} while (total_consumed_count != produce_buffer_count);
});
producer.join();
consumer.join();
}
}
} // namespace opencv_test
#endif // HAVE_ONEVPL
Loading…
Cancel
Save