Merge pull request #21687 from sivanov-work:vpp_pp_integr

G-API: Add VPP preproc CPU/GPU dispatcher

* Add VPP preproc acceleration dispatcher & UTs

* Fix compilation

* Apply some comments
pull/21722/head
Sergey Ivanov 3 years ago committed by GitHub
parent e16cb8b4a2
commit 54733eba6f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      modules/gapi/CMakeLists.txt
  2. 11
      modules/gapi/src/streaming/onevpl/accelerators/surface/base_frame_adapter.cpp
  3. 6
      modules/gapi/src/streaming/onevpl/accelerators/surface/base_frame_adapter.hpp
  4. 2
      modules/gapi/src/streaming/onevpl/accelerators/surface/cpu_frame_adapter.cpp
  5. 2
      modules/gapi/src/streaming/onevpl/accelerators/surface/dx11_frame_adapter.cpp
  6. 85
      modules/gapi/src/streaming/onevpl/engine/preproc/preproc_dispatcher.cpp
  7. 53
      modules/gapi/src/streaming/onevpl/engine/preproc/preproc_dispatcher.hpp
  8. 15
      modules/gapi/src/streaming/onevpl/engine/preproc/preproc_engine.cpp
  9. 6
      modules/gapi/src/streaming/onevpl/engine/preproc/vpp_preproc_defines.hpp
  10. 21
      modules/gapi/src/streaming/onevpl/engine/preproc_defines.hpp
  11. 168
      modules/gapi/test/streaming/gapi_streaming_vpp_preproc_test.cpp

@ -203,6 +203,7 @@ set(gapi_srcs
src/streaming/onevpl/engine/transcode/transcode_session.cpp
src/streaming/onevpl/engine/preproc/preproc_engine.cpp
src/streaming/onevpl/engine/preproc/preproc_session.cpp
src/streaming/onevpl/engine/preproc/preproc_dispatcher.cpp
src/streaming/onevpl/demux/async_mfp_demux_data_provider.cpp
src/streaming/onevpl/data_provider_dispatcher.cpp

@ -15,8 +15,11 @@ namespace cv {
namespace gapi {
namespace wip {
namespace onevpl {
BaseFrameAdapter::BaseFrameAdapter(std::shared_ptr<Surface> surface, SessionHandle assoc_handle):
parent_surface_ptr(surface), parent_handle(assoc_handle) {
BaseFrameAdapter::BaseFrameAdapter(std::shared_ptr<Surface> surface,
SessionHandle assoc_handle,
AccelType accel):
parent_surface_ptr(surface), parent_handle(assoc_handle),
acceleration_type(accel) {
GAPI_Assert(parent_surface_ptr && "Surface is nullptr");
GAPI_Assert(parent_handle && "mfxSession is nullptr");
@ -63,6 +66,10 @@ const BaseFrameAdapter::SessionHandle BaseFrameAdapter::get_session_handle() con
cv::GFrameDesc BaseFrameAdapter::meta() const {
return frame_desc;
}
AccelType BaseFrameAdapter::accel_type() const {
return acceleration_type;
}
} // namespace onevpl
} // namespace wip
} // namespace gapi

@ -9,6 +9,7 @@
#include <memory>
#include <opencv2/gapi/media.hpp>
#include <opencv2/gapi/streaming/onevpl/device_selector_interface.hpp>
#include "streaming/onevpl/accelerators/surface/surface.hpp"
#ifdef HAVE_ONEVPL
@ -25,14 +26,17 @@ public:
const SessionHandle get_session_handle() const;
cv::GFrameDesc meta() const override;
AccelType accel_type() const;
protected:
BaseFrameAdapter(std::shared_ptr<Surface> assoc_surface, SessionHandle assoc_handle);
BaseFrameAdapter(std::shared_ptr<Surface> assoc_surface, SessionHandle assoc_handle,
AccelType accel);
~BaseFrameAdapter();
std::shared_ptr<Surface> surface();
std::shared_ptr<Surface> parent_surface_ptr;
SessionHandle parent_handle;
GFrameDesc frame_desc;
AccelType acceleration_type;
};
} // namespace onevpl
} // namespace wip

@ -18,7 +18,7 @@ namespace onevpl {
VPLMediaFrameCPUAdapter::VPLMediaFrameCPUAdapter(std::shared_ptr<Surface> surface,
SessionHandle assoc_handle):
BaseFrameAdapter(surface, assoc_handle) {
BaseFrameAdapter(surface, assoc_handle, AccelType::HOST) {
}
VPLMediaFrameCPUAdapter::~VPLMediaFrameCPUAdapter() = default;

@ -42,7 +42,7 @@ void unlock_mid(mfxMemId mid, mfxFrameData &data, MediaFrame::Access mode) {
VPLMediaFrameDX11Adapter::VPLMediaFrameDX11Adapter(std::shared_ptr<Surface> assoc_surface,
SessionHandle assoc_handle):
BaseFrameAdapter(assoc_surface, assoc_handle) {
BaseFrameAdapter(assoc_surface, assoc_handle, AccelType::DX11) {
Surface::data_t& data = assoc_surface->get_data();
LockAdapter* alloc_data = reinterpret_cast<LockAdapter*>(data.MemId);

@ -0,0 +1,85 @@
// This file is part of OpenCV project.
// It is subject to the license terms in the LICENSE file found in the top-level directory
// of this distribution and at http://opencv.org/license.html.
//
// Copyright (C) 2022 Intel Corporation
#ifdef HAVE_ONEVPL
#include <algorithm>
#include <exception>
#include <opencv2/gapi/streaming/onevpl/data_provider_interface.hpp>
#include "streaming/onevpl/engine/preproc/preproc_engine.hpp"
#include "streaming/onevpl/engine/preproc/preproc_session.hpp"
#include "streaming/onevpl/engine/preproc/preproc_dispatcher.hpp"
#include "streaming/onevpl/accelerators/accel_policy_interface.hpp"
#include "streaming/onevpl/accelerators/surface/surface.hpp"
#include "streaming/onevpl/cfg_params_parser.hpp"
#include "logger.hpp"
namespace cv {
namespace gapi {
namespace wip {
namespace onevpl {
cv::util::optional<pp_params> VPPPreprocDispatcher::is_applicable(const cv::MediaFrame& in_frame) {
cv::util::optional<pp_params> param;
GAPI_LOG_DEBUG(nullptr, "workers: " << workers.size());
for (const auto &w : workers) {
param = w->is_applicable(in_frame);
if (param.has_value()) {
auto &vpp_param = param.value().get<vpp_pp_params>();
BaseFrameAdapter* adapter = reinterpret_cast<BaseFrameAdapter*>(vpp_param.reserved);
const IDeviceSelector::DeviceScoreTable &devs =
(std::static_pointer_cast<VPPPreprocEngine>(w))->get_accel()->get_device_selector()->select_devices();
GAPI_DbgAssert(devs.size() >= 1 && "Invalid device selector");
auto worker_accel_type = std::get<1>(*devs.begin()).get_type();
GAPI_LOG_DEBUG(nullptr, "acceleration types for frame: " << to_cstring(adapter->accel_type()) <<
", for worker: " << to_cstring(worker_accel_type));
if (worker_accel_type == adapter->accel_type()){
vpp_param.reserved = reinterpret_cast<void *>(w.get());
GAPI_LOG_DEBUG(nullptr, "selected worker: " << vpp_param.reserved);
break;
}
}
}
return param;
}
pp_session VPPPreprocDispatcher::initialize_preproc(const pp_params& initial_frame_param,
const GFrameDesc& required_frame_descr) {
const auto &vpp_param = initial_frame_param.get<vpp_pp_params>();
GAPI_LOG_DEBUG(nullptr, "workers: " << workers.size());
for (auto &w : workers) {
if (reinterpret_cast<void*>(w.get()) == vpp_param.reserved) {
pp_session sess = w->initialize_preproc(initial_frame_param, required_frame_descr);
vpp_pp_session &vpp_sess = sess.get<vpp_pp_session>();
vpp_sess.reserved = reinterpret_cast<void *>(w.get());
GAPI_LOG_DEBUG(nullptr, "initialized session preproc for worker: " << vpp_sess.reserved);
return sess;
}
}
GAPI_Assert(false && "Cannot initialize VPP preproc in dispatcher, no suitable worker");
}
cv::MediaFrame VPPPreprocDispatcher::run_sync(const pp_session &session_handle,
const cv::MediaFrame& in_frame,
const cv::util::optional<cv::Rect> &opt_roi) {
const auto &vpp_sess = session_handle.get<vpp_pp_session>();
GAPI_LOG_DEBUG(nullptr, "workers: " << workers.size());
for (auto &w : workers) {
if (reinterpret_cast<void*>(w.get()) == vpp_sess.reserved) {
GAPI_LOG_DEBUG(nullptr, "trigger execution on worker: " << vpp_sess.reserved);
return w->run_sync(session_handle, in_frame, opt_roi);
}
}
GAPI_Assert(false && "Cannot invoke VPP preproc in dispatcher, no suitable worker");
}
} // namespace onevpl
} // namespace wip
} // namespace gapi
} // namespace cv
#endif // HAVE_ONEVPL

@ -0,0 +1,53 @@
// This file is part of OpenCV project.
// It is subject to the license terms in the LICENSE file found in the top-level directory
// of this distribution and at http://opencv.org/license.html.
//
// Copyright (C) 2022 Intel Corporation
#ifndef GAPI_STREAMING_ONEVPL_PREPROC_DISPATCHER_HPP
#define GAPI_STREAMING_ONEVPL_PREPROC_DISPATCHER_HPP
#include <memory>
#include <vector>
#include "streaming/onevpl/engine/preproc_engine_interface.hpp"
#include "streaming/onevpl/engine/preproc_defines.hpp"
#ifdef HAVE_ONEVPL
#include "streaming/onevpl/onevpl_export.hpp"
namespace cv {
namespace gapi {
namespace wip {
namespace onevpl {
// GAPI_EXPORTS for tests
class GAPI_EXPORTS VPPPreprocDispatcher final : public cv::gapi::wip::IPreprocEngine {
public:
cv::util::optional<pp_params> is_applicable(const cv::MediaFrame& in_frame) override;
pp_session initialize_preproc(const pp_params& initial_frame_param,
const GFrameDesc& required_frame_descr) override;
cv::MediaFrame run_sync(const pp_session &session_handle,
const cv::MediaFrame& in_frame,
const cv::util::optional<cv::Rect> &opt_roi) override;
template<class PreprocImpl, class ...Args>
void insert_worker(Args&& ...args) {
workers.emplace_back(std::make_shared<PreprocImpl>(std::forward<Args>(args)...));
}
size_t size() const {
return workers.size();
}
private:
std::vector<std::shared_ptr<cv::gapi::wip::IPreprocEngine>> workers;
};
} // namespace onevpl
} // namespace wip
} // namespace gapi
} // namespace cv
#endif // HAVE_ONEVPL
#endif // GAPI_STREAMING_ONEVPL_PREPROC_DISPATCHER_HPP

@ -163,7 +163,8 @@ cv::util::optional<pp_params> VPPPreprocEngine::is_applicable(const cv::MediaFra
if (vpl_adapter) {
ret = cv::util::make_optional<pp_params>(
pp_params::create<vpp_pp_params>(vpl_adapter->get_session_handle(),
vpl_adapter->get_surface()->get_info()));
vpl_adapter->get_surface()->get_info(),
vpl_adapter));
GAPI_LOG_DEBUG(nullptr, "VPP preprocessing applicable, session [" <<
vpl_adapter->get_session_handle() << "]");
}
@ -203,7 +204,7 @@ pp_session VPPPreprocEngine::initialize_preproc(const pp_params& initial_frame_p
// check In & Out equally to bypass preproc
if (mfxVPPParams.vpp.Out == mfxVPPParams.vpp.In) {
GAPI_LOG_DEBUG(nullptr, "no preproc required");
return pp_session::create<EngineSession>(nullptr);
return pp_session::create<vpp_pp_session>(nullptr);
}
// recalculate size param according to VPP alignment
@ -221,7 +222,7 @@ pp_session VPPPreprocEngine::initialize_preproc(const pp_params& initial_frame_p
auto it = preproc_session_map.find(mfxVPPParams.vpp.In);
if (it != preproc_session_map.end()) {
GAPI_LOG_DEBUG(nullptr, "[" << it->second->session << "] found");
return pp_session::create(std::static_pointer_cast<EngineSession>(it->second));
return pp_session::create<vpp_pp_session>(std::static_pointer_cast<EngineSession>(it->second));
}
// NB: make some sanity checks
@ -311,7 +312,7 @@ pp_session VPPPreprocEngine::initialize_preproc(const pp_params& initial_frame_p
bool inserted = preproc_session_map.emplace(mfxVPPParams.vpp.In, sess_ptr).second;
GAPI_Assert(inserted && "preproc session is exist");
GAPI_LOG_INFO(nullptr, "VPPPreprocSession created, total sessions: " << preproc_session_map.size());
return pp_session::create(std::static_pointer_cast<EngineSession>(sess_ptr));
return pp_session::create<vpp_pp_session>(std::static_pointer_cast<EngineSession>(sess_ptr));
}
void VPPPreprocEngine::on_frame_ready(session_type& sess,
@ -339,12 +340,12 @@ VPPPreprocEngine::initialize_session(mfxSession,
cv::MediaFrame VPPPreprocEngine::run_sync(const pp_session& sess, const cv::MediaFrame& in_frame,
const cv::util::optional<cv::Rect> &roi) {
std::shared_ptr<EngineSession> pp_sess_impl = sess.get<EngineSession>();
if (!pp_sess_impl) {
vpp_pp_session pp_sess_impl = sess.get<vpp_pp_session>();
if (!pp_sess_impl.handle) {
// bypass case
return in_frame;
}
session_ptr_type s = std::static_pointer_cast<session_type>(pp_sess_impl);
session_ptr_type s = std::static_pointer_cast<session_type>(pp_sess_impl.handle);
GAPI_DbgAssert(s && "Session is nullptr");
GAPI_DbgAssert(is_applicable(in_frame) &&
"VPP preproc is not applicable for the given frame");

@ -18,9 +18,13 @@ namespace onevpl {
struct vpp_pp_params {
mfxSession handle;
mfxFrameInfo info;
void *reserved = nullptr;
};
using vpp_pp_session_ptr = std::shared_ptr<EngineSession>;
struct vpp_pp_session {
std::shared_ptr<EngineSession> handle;
void *reserved = nullptr;
};
} // namespace onevpl
} // namespace wip
} // namespace gapi

@ -19,12 +19,12 @@ namespace wip {
#ifdef VPP_PREPROC_ENGINE
#define GAPI_BACKEND_PP_PARAMS cv::gapi::wip::onevpl::vpp_pp_params
#define GAPI_BACKEND_PP_SESSIONS cv::gapi::wip::onevpl::vpp_pp_session_ptr
#define GAPI_BACKEND_PP_SESSIONS cv::gapi::wip::onevpl::vpp_pp_session
#else // VPP_PREPROC_ENGINE
struct empty_pp_params {};
struct empty_pp_session {};
#define GAPI_BACKEND_PP_PARAMS cv::gapi::wip::empty_pp_params;
#define GAPI_BACKEND_PP_SESSIONS std::shared_ptr<cv::gapi::wip::empty_pp_session>;
#define GAPI_BACKEND_PP_SESSIONS cv::gapi::wip::empty_pp_session;
#endif // VPP_PREPROC_ENGINE
struct pp_params {
@ -57,26 +57,25 @@ private:
struct pp_session {
using value_type = cv::util::variant<GAPI_BACKEND_PP_SESSIONS>;
template<typename BackendSpecificSesionType>
static pp_session create(std::shared_ptr<BackendSpecificSesionType> session) {
static_assert(cv::detail::contains<std::shared_ptr<BackendSpecificSesionType>,
template<typename BackendSpecificSesionType, typename ...Args>
static pp_session create(Args&& ...args) {
static_assert(cv::detail::contains<BackendSpecificSesionType,
GAPI_BACKEND_PP_SESSIONS>::value,
"Invalid BackendSpecificSesionType requested");
pp_session ret;
ret.value = session;
ret.value = BackendSpecificSesionType{std::forward<Args>(args)...};;
return ret;
}
template<typename BackendSpecificSesionType>
std::shared_ptr<BackendSpecificSesionType> get() {
using ptr_type = std::shared_ptr<BackendSpecificSesionType>;
static_assert(cv::detail::contains<ptr_type, GAPI_BACKEND_PP_SESSIONS>::value,
BackendSpecificSesionType &get() {
static_assert(cv::detail::contains<BackendSpecificSesionType, GAPI_BACKEND_PP_SESSIONS>::value,
"Invalid BackendSpecificSesionType requested");
return cv::util::get<ptr_type>(value);
return cv::util::get<BackendSpecificSesionType>(value);
}
template<typename BackendSpecificSesionType>
std::shared_ptr<BackendSpecificSesionType> get() const {
const BackendSpecificSesionType &get() const {
return const_cast<pp_session*>(this)->get<BackendSpecificSesionType>();
}
private:

@ -49,6 +49,7 @@
#include "streaming/onevpl/engine/preproc/preproc_engine.hpp"
#include "streaming/onevpl/engine/preproc/preproc_session.hpp"
#include "streaming/onevpl/engine/preproc/preproc_dispatcher.hpp"
#include "streaming/onevpl/engine/transcode/transcode_engine_legacy.hpp"
#include "streaming/onevpl/engine/transcode/transcode_session.hpp"
@ -279,8 +280,8 @@ TEST(OneVPL_Source_PreprocEngine, functional_single_thread)
pp_session pp_sess = preproc_engine.initialize_preproc(params.value(),
required_frame_param);
ASSERT_EQ(pp_sess.get<EngineSession>().get(),
first_pp_sess.get<EngineSession>().get());
ASSERT_EQ(pp_sess.get<vpp_pp_session>().handle.get(),
first_pp_sess.get<vpp_pp_session>().handle.get());
cv::MediaFrame pp_frame = preproc_engine.run_sync(pp_sess,
decoded_frame,
@ -319,7 +320,7 @@ void decode_function(cv::gapi::wip::onevpl::VPLLegacyDecodeEngine &decode_engine
queue.push_stop();
}
void preproc_function(cv::gapi::wip::onevpl::VPPPreprocEngine &preproc_engine, SafeQueue&queue,
void preproc_function(cv::gapi::wip::IPreprocEngine &preproc_engine, SafeQueue&queue,
size_t &preproc_number, const out_frame_info_t &required_frame_param,
const cv::util::optional<cv::Rect> &roi_rect = {}) {
using namespace cv::gapi::wip;
@ -361,12 +362,15 @@ void preproc_function(cv::gapi::wip::onevpl::VPPPreprocEngine &preproc_engine, S
cv::util::optional<pp_params> params = preproc_engine.is_applicable(decoded_frame);
ASSERT_TRUE(params.has_value());
ASSERT_TRUE(0 == memcmp(&params.value(), &first_pp_params.value(), sizeof(pp_params)));
const auto &vpp_params = params.value().get<vpp_pp_params>();
const auto &first_vpp_params = first_pp_params.value().get<vpp_pp_params>();
ASSERT_EQ(vpp_params.handle, first_vpp_params.handle);
ASSERT_TRUE(0 == memcmp(&vpp_params.info, &first_vpp_params.info, sizeof(mfxFrameInfo)));
pp_session pp_sess = preproc_engine.initialize_preproc(params.value(),
required_frame_param);
ASSERT_EQ(pp_sess.get<EngineSession>().get(),
first_pp_sess.get<EngineSession>().get());
ASSERT_EQ(pp_sess.get<vpp_pp_session>().handle.get(),
first_pp_sess.get<vpp_pp_session>().handle.get());
cv::MediaFrame pp_frame = preproc_engine.run_sync(pp_sess, decoded_frame, empty_roi);
cv::GFrameDesc pp_desc = pp_frame.desc();
@ -381,6 +385,71 @@ void preproc_function(cv::gapi::wip::onevpl::VPPPreprocEngine &preproc_engine, S
ASSERT_NE(preproc_number, 1);
}
void multi_source_preproc_function(size_t source_num,
cv::gapi::wip::IPreprocEngine &preproc_engine, SafeQueue&queue,
size_t &preproc_number, const out_frame_info_t &required_frame_param,
const cv::util::optional<cv::Rect> &roi_rect = {}) {
using namespace cv::gapi::wip;
using namespace cv::gapi::wip::onevpl;
// create preproc session based on frame description & network info
cv::MediaFrame first_decoded_frame = queue.pop();
cv::util::optional<pp_params> first_pp_params = preproc_engine.is_applicable(first_decoded_frame);
ASSERT_TRUE(first_pp_params.has_value());
pp_session first_pp_sess =
preproc_engine.initialize_preproc(first_pp_params.value(),
required_frame_param);
// make preproc using incoming decoded frame & preproc session
cv::MediaFrame first_pp_frame = preproc_engine.run_sync(first_pp_sess,
first_decoded_frame,
roi_rect);
cv::GFrameDesc first_outcome_pp_desc = first_pp_frame.desc();
// do not hold media frames because they share limited DX11 surface pool resources
first_decoded_frame = cv::MediaFrame();
first_pp_frame = cv::MediaFrame();
// launch pipeline
bool in_progress = false;
preproc_number = 1;
size_t received_stop_count = 0;
try {
while(received_stop_count != source_num) {
cv::MediaFrame decoded_frame = queue.pop();
if (SafeQueue::is_stop(decoded_frame)) {
++received_stop_count;
continue;
}
in_progress = true;
cv::util::optional<pp_params> params = preproc_engine.is_applicable(decoded_frame);
ASSERT_TRUE(params.has_value());
pp_session pp_sess = preproc_engine.initialize_preproc(params.value(),
required_frame_param);
cv::MediaFrame pp_frame = preproc_engine.run_sync(pp_sess, decoded_frame, empty_roi);
cv::GFrameDesc pp_desc = pp_frame.desc();
ASSERT_TRUE(pp_desc == first_outcome_pp_desc);
in_progress = false;
decoded_frame = cv::MediaFrame();
preproc_number++;
}
} catch (const std::exception& ex) {
GAPI_LOG_WARNING(nullptr, "Caught exception in preproc worker: " << ex.what());
}
// test if interruption has happened
if (in_progress) {
while (true) {
cv::MediaFrame decoded_frame = queue.pop();
if (SafeQueue::is_stop(decoded_frame)) {
break;
}
}
}
ASSERT_FALSE(in_progress);
ASSERT_NE(preproc_number, 1);
}
using roi_t = cv::util::optional<cv::Rect>;
using preproc_roi_args_t = decltype(std::tuple_cat(std::declval<preproc_args_t>(),
std::declval<std::tuple<roi_t>>()));
@ -548,6 +617,93 @@ TEST_P(VPPInnerPreprocParams, functional_inner_preproc_size)
INSTANTIATE_TEST_CASE_P(OneVPL_Source_PreprocInner, VPPInnerPreprocParams,
testing::ValuesIn(files));
// Dispatcher test suite
class VPPPreprocDispatcherROIParams : public ::testing::TestWithParam<preproc_roi_args_t> {};
TEST_P(VPPPreprocDispatcherROIParams, functional_roi_different_threads)
{
using namespace cv::gapi::wip;
using namespace cv::gapi::wip::onevpl;
source_t file_path;
decoder_t decoder_id;
acceleration_t accel = MFX_ACCEL_MODE_VIA_D3D11;
out_frame_info_t required_frame_param;
roi_t opt_roi;
std::tie(file_path, decoder_id, std::ignore, required_frame_param, opt_roi) = GetParam();
file_path = findDataFile(file_path);
std::vector<CfgParam> cfg_params_w_dx11;
cfg_params_w_dx11.push_back(CfgParam::create_acceleration_mode(accel));
std::unique_ptr<VPLAccelerationPolicy> decode_accel_policy (
new VPLDX11AccelerationPolicy(std::make_shared<CfgParamDeviceSelector>(cfg_params_w_dx11)));
// create file data provider
std::shared_ptr<IDataProvider> data_provider(new FileDataProvider(file_path,
{CfgParam::create_decoder_id(decoder_id)}));
std::shared_ptr<IDataProvider> cpu_data_provider(new FileDataProvider(file_path,
{CfgParam::create_decoder_id(decoder_id)}));
mfxLoader mfx{};
mfxConfig mfx_cfg{};
std::tie(mfx, mfx_cfg) = prepare_mfx(decoder_id, accel);
// create decode session
mfxSession mfx_decode_session{};
mfxStatus sts = MFXCreateSession(mfx, 0, &mfx_decode_session);
EXPECT_EQ(MFX_ERR_NONE, sts);
mfxSession mfx_cpu_decode_session{};
sts = MFXCreateSession(mfx, 0, &mfx_cpu_decode_session);
EXPECT_EQ(MFX_ERR_NONE, sts);
// create decode engines
auto device_selector = decode_accel_policy->get_device_selector();
VPLLegacyDecodeEngine decode_engine(std::move(decode_accel_policy));
auto sess_ptr = decode_engine.initialize_session(mfx_decode_session,
cfg_params_w_dx11,
data_provider);
std::vector<CfgParam> cfg_params_cpu;
auto cpu_device_selector = std::make_shared<CfgParamDeviceSelector>(cfg_params_cpu);
VPLLegacyDecodeEngine cpu_decode_engine(std::unique_ptr<VPLAccelerationPolicy>{
new VPLCPUAccelerationPolicy(cpu_device_selector)});
auto cpu_sess_ptr = cpu_decode_engine.initialize_session(mfx_cpu_decode_session,
cfg_params_cpu,
cpu_data_provider);
// create VPP preproc engines
VPPPreprocDispatcher preproc_dispatcher;
preproc_dispatcher.insert_worker<VPPPreprocEngine>(std::unique_ptr<VPLAccelerationPolicy>{
new VPLDX11AccelerationPolicy(device_selector)});
preproc_dispatcher.insert_worker<VPPPreprocEngine>(std::unique_ptr<VPLAccelerationPolicy>{
new VPLCPUAccelerationPolicy(cpu_device_selector)});
// launch threads
SafeQueue queue;
size_t decoded_number = 1;
size_t cpu_decoded_number = 1;
size_t preproc_number = 0;
std::thread decode_thread(decode_function, std::ref(decode_engine), sess_ptr,
std::ref(queue), std::ref(decoded_number));
std::thread cpu_decode_thread(decode_function, std::ref(cpu_decode_engine), cpu_sess_ptr,
std::ref(queue), std::ref(cpu_decoded_number));
std::thread preproc_thread(multi_source_preproc_function,
preproc_dispatcher.size(),
std::ref(preproc_dispatcher),
std::ref(queue), std::ref(preproc_number),
std::cref(required_frame_param),
std::cref(opt_roi));
decode_thread.join();
cpu_decode_thread.join();
preproc_thread.join();
ASSERT_EQ(preproc_number, decoded_number + cpu_decoded_number);
}
INSTANTIATE_TEST_CASE_P(OneVPL_Source_PreprocDispatcherROI, VPPPreprocDispatcherROIParams,
testing::ValuesIn(files_w_roi));
#endif // HAVE_DIRECTX
#endif // HAVE_D3D11
} // namespace opencv_test

Loading…
Cancel
Save