Merge pull request #20901 from sivanov-work:merge_source_unite_perf_mod

G-API: oneVPL - Performance: Add async decode pipeline & add cached pool

* Add async decode pipeline & intro cached pool

* Fix performacne test with checking OPENCV_EXTRA

* Add sip perf test with no VPL

* Fix misprint

* Remove empty line..

* Apply some comments

* Apply some comments

* Make perf test fail if no OPENCV_TEST_DATA_PATH declared
pull/21040/head^2
Sergey Ivanov 3 years ago committed by GitHub
parent 3cfca01372
commit da6344297a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 12
      modules/gapi/CMakeLists.txt
  2. 83
      modules/gapi/perf/streaming/gapi_streaming_source_perf_tests.cpp
  3. 8
      modules/gapi/samples/infer_single_roi.cpp
  4. 72
      modules/gapi/src/streaming/onevpl/accelerators/accel_policy_cpu.cpp
  5. 7
      modules/gapi/src/streaming/onevpl/accelerators/accel_policy_cpu.hpp
  6. 20
      modules/gapi/src/streaming/onevpl/accelerators/surface/surface_pool.cpp
  7. 9
      modules/gapi/src/streaming/onevpl/accelerators/surface/surface_pool.hpp
  8. 106
      modules/gapi/src/streaming/onevpl/engine/decode/decode_engine_legacy.cpp
  9. 3
      modules/gapi/src/streaming/onevpl/engine/decode/decode_engine_legacy.hpp
  10. 9
      modules/gapi/src/streaming/onevpl/engine/decode/decode_session.cpp
  11. 5
      modules/gapi/src/streaming/onevpl/engine/decode/decode_session.hpp
  12. 3
      modules/gapi/src/streaming/onevpl/utils.cpp

@ -297,3 +297,15 @@ if (TARGET example_gapi_onevpl_infer_single_roi)
ocv_target_include_directories(example_gapi_onevpl_infer_single_roi SYSTEM PRIVATE ${OPENCL_INCLUDE_DIRS})
endif()
endif()
# perf test dependencies postprocessing
if(HAVE_GAPI_ONEVPL)
# NB: TARGET opencv_perf_gapi doesn't exist before `ocv_add_perf_tests`
if(TARGET opencv_perf_gapi)
ocv_target_compile_definitions(opencv_perf_gapi PRIVATE -DHAVE_ONEVPL)
ocv_target_link_libraries(opencv_perf_gapi PRIVATE ${VPL_IMPORTED_TARGETS})
if(HAVE_D3D11 AND HAVE_OPENCL)
ocv_target_include_directories(opencv_perf_gapi SYSTEM PRIVATE ${OPENCL_INCLUDE_DIRS})
endif()
endif()
endif()

@ -0,0 +1,83 @@
// This file is part of OpenCV project.
// It is subject to the license terms in the LICENSE file found in the top-level directory
// of this distribution and at http://opencv.org/license.html.
//
// Copyright (C) 2021 Intel Corporation
#ifdef HAVE_ONEVPL
#include "../perf_precomp.hpp"
#include "../../test/common/gapi_tests_common.hpp"
#include <opencv2/gapi/streaming/onevpl/source.hpp>
#include <opencv2/gapi/streaming/cap.hpp>
namespace opencv_test
{
using namespace perf;
const std::string files[] = {
"highgui/video/big_buck_bunny.h265",
"highgui/video/big_buck_bunny.h264",
};
const std::string codec[] = {
"MFX_CODEC_HEVC",
"MFX_CODEC_AVC"
};
using source_t = std::string;
using codec_t = std::string;
using source_description_t = std::tuple<source_t, codec_t>;
class OneVPLSourcePerfTest : public TestPerfParams<source_description_t> {};
class VideoCapSourcePerfTest : public TestPerfParams<source_t> {};
PERF_TEST_P_(OneVPLSourcePerfTest, TestPerformance)
{
using namespace cv::gapi::wip::onevpl;
const auto params = GetParam();
source_t src = findDataFile(get<0>(params));
codec_t type = get<1>(params);
std::vector<CfgParam> cfg_params {
CfgParam::create<std::string>("mfxImplDescription.Impl", "MFX_IMPL_TYPE_HARDWARE"),
CfgParam::create("mfxImplDescription.mfxDecoderDescription.decoder.CodecID", type),
};
auto source_ptr = cv::gapi::wip::make_onevpl_src(src, cfg_params);
cv::gapi::wip::Data out;
TEST_CYCLE()
{
source_ptr->pull(out);
}
SANITY_CHECK_NOTHING();
}
PERF_TEST_P_(VideoCapSourcePerfTest, TestPerformance)
{
using namespace cv::gapi::wip;
source_t src = findDataFile(GetParam());
auto source_ptr = make_src<GCaptureSource>(src);
Data out;
TEST_CYCLE()
{
source_ptr->pull(out);
}
SANITY_CHECK_NOTHING();
}
INSTANTIATE_TEST_CASE_P(Streaming, OneVPLSourcePerfTest,
Values(source_description_t(files[0], codec[0]),
source_description_t(files[1], codec[1])));
INSTANTIATE_TEST_CASE_P(Streaming, VideoCapSourcePerfTest,
Values(files[0],
files[1]));
} // namespace opencv_test
#endif // HAVE_ONEVPL

@ -256,9 +256,17 @@ int main(int argc, char *argv[])
pipeline.start();
cv::Mat out;
int framesCount = 0;
cv::TickMeter t;
t.start();
while (pipeline.pull(cv::gout(out))) {
cv::imshow("Out", out);
cv::waitKey(1);
framesCount++;
}
t.stop();
std::cout << "Elapsed time: " << t.getTimeSec() << std::endl;
std::cout << "FPS: " << framesCount / (t.getTimeSec() ? t.getTimeSec() : 1) << std::endl;
std::cout << "framesCount: " << framesCount << std::endl;
return 0;
}

@ -47,9 +47,8 @@ VPLCPUAccelerationPolicy::create_surface_pool(size_t pool_size, size_t surface_s
surface_ptr_ctr_t creator) {
GAPI_LOG_DEBUG(nullptr, "pool size: " << pool_size << ", surface size bytes: " << surface_size_bytes);
// create empty pool
pool_t pool;
pool.reserve(pool_size);
// NB: create empty pool with reservation
pool_t pool(pool_size);
// allocate workplace memory area
size_t preallocated_raw_bytes = pool_size * surface_size_bytes;
@ -107,7 +106,7 @@ VPLCPUAccelerationPolicy::create_surface_pool(size_t pool_size, size_t surface_s
// remember pool by key
GAPI_LOG_INFO(nullptr, "New pool allocated, key: " << preallocated_pool_memory_ptr <<
", surface count: " << pool.size() <<
", surface count: " << pool.total_size() <<
", surface size bytes: " << surface_size_bytes);
if (!pool_table.emplace(preallocated_pool_memory_ptr, std::move(pool)).second) {
GAPI_LOG_WARNING(nullptr, "Cannot insert pool, table size: " + std::to_string(pool_table.size()) <<
@ -121,33 +120,13 @@ VPLCPUAccelerationPolicy::create_surface_pool(size_t pool_size, size_t surface_s
VPLCPUAccelerationPolicy::surface_weak_ptr_t VPLCPUAccelerationPolicy::get_free_surface(pool_key_t key) {
auto pool_it = pool_table.find(key);
if (pool_it == pool_table.end()) {
throw std::runtime_error("VPLCPUAccelerationPolicy::get_free_surface - "
"key is not found, table size: " +
std::to_string(pool_table.size()));
GAPI_LOG_WARNING(nullptr, "key is not found, table size: " <<
pool_table.size());
GAPI_Assert(false && "Invalid surface key requested in VPLCPUAccelerationPolicy");
}
pool_t& requested_pool = pool_it->second;
#ifdef TEST_PERF
return requested_pool.find_free();
#else // TEST_PERF
auto it =
std::find_if(requested_pool.begin(), requested_pool.end(),
[](const surface_ptr_t& val) {
GAPI_DbgAssert(val && "Pool contains empty surface");
return !val->get_locks_count();
});
// Limitation realloc pool might be a future extension
if (it == requested_pool.end()) {
std::stringstream ss;
ss << "cannot get free surface from pool, key: " << key << ", size: " << requested_pool.size();
const std::string& str = ss.str();
GAPI_LOG_WARNING(nullptr, str);
throw std::runtime_error(std::string(__FUNCTION__) + " - " + str);
}
return *it;
#endif // TEST_PERF
}
size_t VPLCPUAccelerationPolicy::get_free_surface_count(pool_key_t key) const {
@ -157,18 +136,8 @@ size_t VPLCPUAccelerationPolicy::get_free_surface_count(pool_key_t key) const {
", table size: " << pool_table.size());
return 0;
}
#ifdef TEST_PERF
return 0;
#else // TEST_PERF
const pool_t& requested_pool = pool_it->second;
size_t free_surf_count =
std::count_if(requested_pool.begin(), requested_pool.end(),
[](const surface_ptr_t& val) {
GAPI_Assert(val && "Pool contains empty surface");
return !val->get_locks_count();
});
return free_surf_count;
#endif // TEST_PERF
return requested_pool.available_size();
}
size_t VPLCPUAccelerationPolicy::get_surface_count(pool_key_t key) const {
@ -178,11 +147,7 @@ size_t VPLCPUAccelerationPolicy::get_surface_count(pool_key_t key) const {
", table size: " << pool_table.size());
return 0;
}
#ifdef TEST_PERF
return 0;
#else // TEST_PERF
return pool_it->second.size();
#endif // TEST_PERF
return pool_it->second.total_size();
}
cv::MediaFrame::AdapterPtr VPLCPUAccelerationPolicy::create_frame_adapter(pool_key_t key,
@ -197,28 +162,7 @@ cv::MediaFrame::AdapterPtr VPLCPUAccelerationPolicy::create_frame_adapter(pool_k
}
pool_t& requested_pool = pool_it->second;
#ifdef TEST_PERF
return cv::MediaFrame::AdapterPtr{new VPLMediaFrameCPUAdapter(requested_pool.find_by_handle(surface))};
#else // TEST_PERF
auto it =
std::find_if(requested_pool.begin(), requested_pool.end(),
[surface](const surface_ptr_t& val) {
GAPI_DbgAssert(val && "Pool contains empty surface");
return val->get_handle() == surface;
});
// Limitation realloc pool might be a future extension
if (it == requested_pool.end()) {
std::stringstream ss;
ss << "cannot get requested surface from pool, key: " << key << ", surf: "
<< surface << ", pool size: " << requested_pool.size();
const std::string& str = ss.str();
GAPI_LOG_WARNING(nullptr, str);
throw std::runtime_error(std::string(__FUNCTION__) + " - " + str);
}
return cv::MediaFrame::AdapterPtr{new VPLMediaFrameCPUAdapter(*it)};
#endif // TEST_PERF
}
} // namespace onevpl
} // namespace wip

@ -15,9 +15,7 @@
#ifdef HAVE_ONEVPL
#include <vpl/mfxvideo.h>
#include "streaming/onevpl/accelerators/accel_policy_interface.hpp"
#ifdef TEST_PERF
#include "streaming/onevpl/accelerators/surface/surface_pool.hpp"
#endif // TEST_PERF
namespace cv {
namespace gapi {
@ -29,11 +27,8 @@ struct GAPI_EXPORTS VPLCPUAccelerationPolicy final : public VPLAccelerationPolic
{
VPLCPUAccelerationPolicy();
~VPLCPUAccelerationPolicy();
#ifdef TEST_PERF
using pool_t = CachedPool;
#else // TEST_PERF
using pool_t = std::vector<surface_ptr_t>;
#endif // TEST_PERF
void init(session_t session) override;
void deinit(session_t session) override;

@ -9,11 +9,15 @@ namespace gapi {
namespace wip {
namespace onevpl {
CachedPool::CachedPool(size_t reserved_size/* = 0 */) {
reserve(reserved_size);
}
void CachedPool::reserve(size_t size) {
surfaces.reserve(size);
}
size_t CachedPool::size() const {
size_t CachedPool::total_size() const {
return surfaces.size();
}
@ -29,12 +33,22 @@ void CachedPool::push_back(surface_ptr_t &&surf) {
next_free_it = surfaces.begin();
}
size_t CachedPool::available_size() const {
size_t free_surf_count =
std::count_if(surfaces.begin(), surfaces.end(),
[](const surface_ptr_t& val) {
GAPI_DbgAssert(val && "Pool contains empty surface");
return (val->get_locks_count() == 0);
});
return free_surf_count;
}
CachedPool::surface_ptr_t CachedPool::find_free() {
auto it =
std::find_if(next_free_it, surfaces.end(),
[](const surface_ptr_t& val) {
GAPI_DbgAssert(val && "Pool contains empty surface");
return !val->get_locks_count();
return (val->get_locks_count() == 0);
});
// Limitation realloc pool might be a future extension
@ -42,7 +56,7 @@ CachedPool::surface_ptr_t CachedPool::find_free() {
it = std::find_if(surfaces.begin(), next_free_it,
[](const surface_ptr_t& val) {
GAPI_DbgAssert(val && "Pool contains empty surface");
return !val->get_locks_count();
return (val->get_locks_count() == 0);
});
if (it == next_free_it) {
std::stringstream ss;

@ -28,13 +28,18 @@ public:
using free_surface_iterator_t = typename surface_container_t::iterator;
using cached_surface_container_t = std::map<mfxFrameSurface1*, surface_ptr_t>;
explicit CachedPool(size_t reserved_size = 0);
void push_back(surface_ptr_t &&surf);
void reserve(size_t size);
size_t size() const;
size_t total_size() const;
size_t available_size() const;
void clear();
surface_ptr_t find_free();
surface_ptr_t find_by_handle(mfxFrameSurface1* handle);
private:
void reserve(size_t size);
surface_container_t surfaces;
free_surface_iterator_t next_free_it;
cached_surface_container_t cache;

@ -128,35 +128,82 @@ VPLLegacyDecodeEngine::VPLLegacyDecodeEngine(std::unique_ptr<VPLAccelerationPoli
}
return ExecutionStatus::Continue;
},
// 2) enqueue ASYNC decode
// 2) enqueue ASYNC decode operation
[this] (EngineSession& sess) -> ExecutionStatus
{
LegacyDecodeSession &my_sess = static_cast<LegacyDecodeSession&>(sess);
// prepare sync object for new surface
LegacyDecodeSession::op_handle_t sync_pair{};
// enqueue decode operation with current session surface
my_sess.last_status =
MFXVideoDECODE_DecodeFrameAsync(my_sess.session,
my_sess.last_status == MFX_ERR_NONE
? &my_sess.stream
: nullptr, /* No more data to read, start decode draining mode*/
my_sess.procesing_surface_ptr.lock()->get_handle(),
&my_sess.output_surface_ptr,
&my_sess.sync);
&sync_pair.second,
&sync_pair.first);
// process wait-like statuses in-place:
// It had better to use up all VPL decoding resources in pipeline
// as soon as possible. So waiting more free-surface or device free
while (my_sess.last_status == MFX_ERR_MORE_SURFACE ||
my_sess.last_status == MFX_WRN_DEVICE_BUSY) {
try {
if (my_sess.last_status == MFX_ERR_MORE_SURFACE) {
my_sess.swap_surface(*this);
}
my_sess.last_status =
MFXVideoDECODE_DecodeFrameAsync(my_sess.session,
&my_sess.stream,
my_sess.procesing_surface_ptr.lock()->get_handle(),
&sync_pair.second,
&sync_pair.first);
} catch (const std::runtime_error& ex) {
GAPI_LOG_WARNING(nullptr, "[" << my_sess.session <<
"] has no surface, reason: " <<
ex.what());
// TODO it is supposed to place `break;` here
// to simulate `yield`-like behavior.
// Further DX11 intergation logic claims more strict rules
// for enqueue surfaces. If no free surface
// is available it had better to wait free one by checking
// for async result than waste time in spinning.
//
// Put it as-is at now to not break
// current compatibility and avoid further merge-conflicts
}
}
if (my_sess.last_status == MFX_ERR_NONE) {
my_sess.sync_queue.emplace(sync_pair);
} else if (my_sess.last_status != MFX_ERR_MORE_DATA) /* suppress MFX_ERR_MORE_DATA warning */ {
GAPI_LOG_WARNING(nullptr, "pending ops count: " << my_sess.sync_queue.size() <<
", sync id: " << sync_pair.first <<
", status: " << mfxstatus_to_string(my_sess.last_status));
}
return ExecutionStatus::Continue;
},
// 3) Wait for ASYNC decode result
[this] (EngineSession& sess) -> ExecutionStatus
{
if (sess.last_status == MFX_ERR_NONE) // Got 1 decoded frame
LegacyDecodeSession& my_sess = static_cast<LegacyDecodeSession&>(sess);
if (!my_sess.sync_queue.empty()) // FIFO: check the oldest async operation complete
{
do {
//TODO try to extract TIMESTAMP
sess.last_status = MFXVideoCORE_SyncOperation(sess.session, sess.sync, 100);
if (MFX_ERR_NONE == sess.last_status) {
LegacyDecodeSession::op_handle_t& pending_op = my_sess.sync_queue.front();
sess.last_status = MFXVideoCORE_SyncOperation(sess.session, pending_op.first, 0);
LegacyDecodeSession& my_sess = static_cast<LegacyDecodeSession&>(sess);
on_frame_ready(my_sess);
}
} while (sess.last_status == MFX_WRN_IN_EXECUTION);
GAPI_LOG_DEBUG(nullptr, "pending ops count: " << my_sess.sync_queue.size() <<
", sync id: " << pending_op.first <<
", status: " << mfxstatus_to_string(my_sess.last_status));
// put frames in ready queue on success
if (MFX_ERR_NONE == sess.last_status) {
on_frame_ready(my_sess, pending_op.second);
}
}
return ExecutionStatus::Continue;
},
@ -223,14 +270,18 @@ ProcessingEngineBase::ExecutionStatus VPLLegacyDecodeEngine::execute_op(operatio
return op(sess);
}
void VPLLegacyDecodeEngine::on_frame_ready(LegacyDecodeSession& sess)
void VPLLegacyDecodeEngine::on_frame_ready(LegacyDecodeSession& sess,
mfxFrameSurface1* ready_surface)
{
GAPI_LOG_DEBUG(nullptr, "[" << sess.session << "], frame ready");
// manage memory ownership rely on acceleration policy
auto frame_adapter = acceleration_policy->create_frame_adapter(sess.decoder_pool_id,
sess.output_surface_ptr);
ready_surface);
ready_frames.emplace(cv::MediaFrame(std::move(frame_adapter)), sess.generate_frame_meta());
// pop away synced out object
sess.sync_queue.pop();
}
ProcessingEngineBase::ExecutionStatus VPLLegacyDecodeEngine::process_error(mfxStatus status, LegacyDecodeSession& sess)
@ -239,7 +290,17 @@ ProcessingEngineBase::ExecutionStatus VPLLegacyDecodeEngine::process_error(mfxSt
switch (status) {
case MFX_ERR_NONE:
return ExecutionStatus::Continue;
{
// prepare sync object for new surface
try {
sess.swap_surface(*this);
return ExecutionStatus::Continue;
} catch (const std::runtime_error& ex) {
GAPI_LOG_WARNING(nullptr, "[" << sess.session << "] error: " << ex.what() <<
"Abort");
// TODO it is supposed to be `break;` here in future PR
}
}
case MFX_ERR_MORE_DATA: // The function requires more bitstream at input before decoding can proceed
if (!sess.data_provider || sess.data_provider->empty()) {
// No more data to drain from decoder, start encode draining mode
@ -256,8 +317,9 @@ ProcessingEngineBase::ExecutionStatus VPLLegacyDecodeEngine::process_error(mfxSt
try {
sess.swap_surface(*this);
return ExecutionStatus::Continue;
} catch (const std::exception& ex) {
} catch (const std::runtime_error& ex) {
GAPI_LOG_WARNING(nullptr, "[" << sess.session << "] error: " << ex.what());
// TODO it is supposed to be `break;` here in future PR
}
break;
}
@ -295,8 +357,18 @@ ProcessingEngineBase::ExecutionStatus VPLLegacyDecodeEngine::process_error(mfxSt
GAPI_DbgAssert(false && "VPLLegacyDecodeEngine::process_error - "
"MFX_ERR_REALLOC_SURFACE is not processed");
break;
case MFX_WRN_IN_EXECUTION:
try {
sess.swap_surface(*this);
return ExecutionStatus::Continue;
} catch (const std::runtime_error& ex) {
GAPI_LOG_WARNING(nullptr, "[" << sess.session << "] error: " << ex.what() <<
"Abort");
// TODO it is supposed to be `break;` here in future PR
}
default:
GAPI_LOG_WARNING(nullptr, "Unknown status code: " << mfxstatus_to_string(status));
GAPI_LOG_WARNING(nullptr, "Unknown status code: " << mfxstatus_to_string(status) <<
", decoded frames: " << sess.decoded_frames_count);
break;
}

@ -38,7 +38,8 @@ private:
ExecutionStatus execute_op(operation_t& op, EngineSession& sess) override;
ExecutionStatus process_error(mfxStatus status, LegacyDecodeSession& sess);
void on_frame_ready(LegacyDecodeSession& sess);
void on_frame_ready(LegacyDecodeSession& sess,
mfxFrameSurface1* ready_surface);
};
} // namespace onevpl
} // namespace wip

@ -27,7 +27,7 @@ LegacyDecodeSession::LegacyDecodeSession(mfxSession sess,
mfx_decoder_param(std::move(decoder_param.param)),
data_provider(std::move(provider)),
procesing_surface_ptr(),
output_surface_ptr(),
sync_queue(),
decoded_frames_count()
{
}
@ -41,16 +41,17 @@ LegacyDecodeSession::~LegacyDecodeSession()
void LegacyDecodeSession::swap_surface(VPLLegacyDecodeEngine& engine) {
VPLAccelerationPolicy* acceleration_policy = engine.get_accel();
GAPI_Assert(acceleration_policy && "Empty acceleration_policy");
auto old_locked = procesing_surface_ptr.lock();
try {
auto cand = acceleration_policy->get_free_surface(decoder_pool_id).lock();
GAPI_LOG_DEBUG(nullptr, "[" << session << "] swap surface"
", old: " << (old_locked ? old_locked->get_handle() : nullptr) <<
", old: " << (!procesing_surface_ptr.expired()
? procesing_surface_ptr.lock()->get_handle()
: nullptr) <<
", new: "<< cand->get_handle());
procesing_surface_ptr = cand;
} catch (const std::exception& ex) {
} catch (const std::runtime_error& ex) {
GAPI_LOG_WARNING(nullptr, "[" << session << "] error: " << ex.what() <<
"Abort");
}

@ -8,6 +8,7 @@
#define GAPI_STREAMING_ONVPL_ENGINE_DECODE_DECODE_SESSION_HPP
#include <stdio.h>
#include <memory>
#include <queue>
#include <opencv2/gapi/streaming/meta.hpp>
@ -48,7 +49,9 @@ private:
mfxFrameAllocRequest request;
std::weak_ptr<Surface> procesing_surface_ptr;
mfxFrameSurface1* output_surface_ptr;
using op_handle_t = std::pair<mfxSyncPoint, mfxFrameSurface1*>;
std::queue<op_handle_t> sync_queue;
int64_t decoded_frames_count;
};

@ -437,7 +437,8 @@ std::string mfxstatus_to_string(mfxStatus err) {
return "MFX_WRN_DEVICE_BUSY";
case MFX_WRN_VIDEO_PARAM_CHANGED:
return "MFX_WRN_VIDEO_PARAM_CHANGED";
case MFX_WRN_IN_EXECUTION:
return "MFX_WRN_IN_EXECUTION";
default:
break;

Loading…
Cancel
Save