mirror of https://github.com/opencv/opencv.git
Merge pull request #21049 from sivanov-work:vpl_dx11_merge
G-API: oneVPL merge DX11 acceleration * Merge DX11 initial * Fold conditions row in MACRO in utils * Inject DeviceSelector * Turn on DeviceSelector in DX11 * Change sharedLock logic & Move FMT checking in FrameAdapter c-tor * Move out NumSuggestFrame to configure params * Drain file source fix * Fix compilation * Force zero initializetion of SharedLock * Fix some compiler warnings * Fix integer comparison warnings * Fix integers in sample * Integrate Demux * Fix compilation * Add predefined names for some CfgParam * Trigger CI * Fix MultithreadCtx bug, Add Dx11 GetBlobParam(), Get rif of ATL CComPtr * Fix UT: remove unit test with deprecated video from opencv_extra * Add creators for most usable CfgParam * Eliminate some warnings * Fix warning in GAPI_Assert * Apply comments * Add VPL wrapped header with MSVC pragma to get rid of global warning maskingpull/21217/head
parent
41d108ead6
commit
5c91f5b71d
49 changed files with 3229 additions and 864 deletions
@ -0,0 +1,404 @@ |
||||
// This file is part of OpenCV project.
|
||||
// It is subject to the license terms in the LICENSE file found in the top-level directory
|
||||
// of this distribution and at http://opencv.org/license.html.
|
||||
//
|
||||
// Copyright (C) 2021 Intel Corporation
|
||||
|
||||
#ifdef HAVE_ONEVPL |
||||
#include "streaming/onevpl/accelerators/dx11_alloc_resource.hpp" |
||||
#include "streaming/onevpl/accelerators/utils/shared_lock.hpp" |
||||
#include "logger.hpp" |
||||
|
||||
#ifdef HAVE_DIRECTX |
||||
#ifdef HAVE_D3D11 |
||||
|
||||
namespace cv { |
||||
namespace gapi { |
||||
namespace wip { |
||||
namespace onevpl { |
||||
|
||||
LockAdapter::LockAdapter(mfxFrameAllocator origin_allocator) : |
||||
lockable_allocator(origin_allocator), |
||||
impl() { |
||||
GAPI_DbgAssert((lockable_allocator.Lock && lockable_allocator.Unlock) && |
||||
"Cannot create LockAdapter for empty origin allocator"); |
||||
|
||||
// abandon unusable c-allocator interfaces
|
||||
// because LockAdapter requires Lock & Unlock only
|
||||
lockable_allocator.Alloc = nullptr; |
||||
lockable_allocator.Free = nullptr; |
||||
lockable_allocator.pthis = nullptr; |
||||
} |
||||
|
||||
size_t LockAdapter::read_lock(mfxMemId mid, mfxFrameData &data) { |
||||
size_t prev_lock_count = 0; |
||||
if (impl) { |
||||
prev_lock_count = impl->shared_lock(); |
||||
} |
||||
|
||||
// dispatch to VPL allocator using READ access mode
|
||||
mfxStatus sts = MFX_ERR_LOCK_MEMORY; |
||||
try { |
||||
sts = lockable_allocator.Lock(nullptr, mid, &data); |
||||
} catch(...) { |
||||
} |
||||
|
||||
// adapter will throw error if VPL frame allocator fails
|
||||
if (sts != MFX_ERR_NONE) { |
||||
impl->unlock_shared(); |
||||
GAPI_Assert(false && "Cannot lock frame on READ using VPL allocator"); |
||||
} |
||||
|
||||
return prev_lock_count; |
||||
} |
||||
|
||||
size_t LockAdapter::unlock_read(mfxMemId mid, mfxFrameData &data) { |
||||
GAPI_DbgAssert(!impl || !is_write_acquired() && |
||||
"Reject `unlock_read` in `write_lock` state"); |
||||
lockable_allocator.Unlock(nullptr, mid, &data); |
||||
return impl ? impl->unlock_shared() : 0; |
||||
} |
||||
|
||||
void LockAdapter::write_lock(mfxMemId mid, mfxFrameData &data) { |
||||
if (impl) { |
||||
// TODO consider using `try_lock` in loop with limited iteration count
|
||||
// to prevent dead-lock with WARN at least notification
|
||||
impl->lock(); |
||||
} |
||||
|
||||
// dispatch to VPL allocator using READ access mode
|
||||
mfxStatus sts = MFX_ERR_LOCK_MEMORY; |
||||
try { |
||||
sts = lockable_allocator.Lock(nullptr, mid, &data); |
||||
} catch(...) { |
||||
} |
||||
|
||||
// adapter will throw error if VPL frame allocator fails
|
||||
if (sts != MFX_ERR_NONE) { |
||||
impl->unlock(); |
||||
GAPI_Assert(false && "Cannot lock frame on WRITE using VPL allocator"); |
||||
} |
||||
} |
||||
|
||||
bool LockAdapter::is_write_acquired() { |
||||
if(!impl) return true; |
||||
return impl->owns(); |
||||
} |
||||
|
||||
void LockAdapter::unlock_write(mfxMemId mid, mfxFrameData &data) { |
||||
GAPI_DbgAssert(is_write_acquired() && |
||||
"Reject `unlock_write` for unlocked state"); |
||||
lockable_allocator.Unlock(nullptr, mid, &data); |
||||
if (impl) { |
||||
impl->unlock(); |
||||
} |
||||
} |
||||
|
||||
SharedLock* LockAdapter::set_adaptee(SharedLock* new_impl) { |
||||
SharedLock* old_impl = impl; |
||||
impl = new_impl; |
||||
return old_impl; |
||||
} |
||||
|
||||
SharedLock* LockAdapter::get_adaptee() { |
||||
return impl; |
||||
} |
||||
|
||||
NativeHandleAdapter::NativeHandleAdapter(mfxFrameAllocator origin_allocator) : |
||||
native_handle_getter(origin_allocator) { |
||||
GAPI_DbgAssert(native_handle_getter.GetHDL && |
||||
"Cannot create NativeHandleAdapter for empty origin allocator"); |
||||
|
||||
// abandon unusable c-allocator interfaces
|
||||
// because NativeHandleAdapter requires `GetHDL` only
|
||||
native_handle_getter.Alloc = nullptr; |
||||
native_handle_getter.Free = nullptr; |
||||
native_handle_getter.Lock = nullptr; |
||||
native_handle_getter.Unlock = nullptr; |
||||
native_handle_getter.pthis = nullptr; |
||||
} |
||||
|
||||
void NativeHandleAdapter::get_handle(mfxMemId mid, mfxHDL& out) { |
||||
if (native_handle_getter.GetHDL(nullptr, mid, &out) != MFX_ERR_NONE) { |
||||
GAPI_Assert(nullptr && "Cannot get native handle for resourse by mid"); |
||||
} |
||||
} |
||||
|
||||
DX11AllocationItem::DX11AllocationItem(std::weak_ptr<DX11AllocationRecord> parent, |
||||
ID3D11DeviceContext* origin_ctx, |
||||
mfxFrameAllocator origin_allocator, |
||||
ComSharedPtrGuard<ID3D11Texture2D> tex_ptr, |
||||
subresource_id_t subtex_id, |
||||
ComPtrGuard<ID3D11Texture2D>&& staging_tex_ptr) : |
||||
LockAdapter(origin_allocator), |
||||
NativeHandleAdapter(origin_allocator), |
||||
shared_device_context(origin_ctx), |
||||
texture_ptr(tex_ptr), |
||||
subresource_id(subtex_id), |
||||
staging_texture_ptr(std::move(staging_tex_ptr)), |
||||
observer(parent) { |
||||
GAPI_DbgAssert(texture_ptr && |
||||
"Cannot create DX11AllocationItem for empty texture"); |
||||
GAPI_DbgAssert(staging_texture_ptr && |
||||
"Cannot create DX11AllocationItem for empty staging texture"); |
||||
GAPI_DbgAssert(observer.lock() && |
||||
"Cannot create DX11AllocationItem for empty parent"); |
||||
|
||||
shared_device_context->AddRef(); |
||||
} |
||||
|
||||
DX11AllocationItem::~DX11AllocationItem() { |
||||
release(); |
||||
observer.reset(); |
||||
if (shared_device_context) { |
||||
shared_device_context->Release(); |
||||
} |
||||
} |
||||
|
||||
void DX11AllocationItem::release() { |
||||
auto parent = observer.lock(); |
||||
GAPI_LOG_DEBUG(nullptr, "texture: " << texture_ptr << |
||||
", subresource id: " << subresource_id << |
||||
", parent: " << parent.get()); |
||||
cv::util::suppress_unused_warning(parent); |
||||
} |
||||
|
||||
ID3D11Texture2D* DX11AllocationItem::get_texture_ptr() { |
||||
return texture_ptr.get(); |
||||
} |
||||
|
||||
ID3D11Texture2D* DX11AllocationItem::get_staging_texture_ptr() { |
||||
return staging_texture_ptr.get(); |
||||
} |
||||
|
||||
DX11AllocationItem::subresource_id_t DX11AllocationItem::get_subresource() const { |
||||
return subresource_id; |
||||
} |
||||
|
||||
ID3D11DeviceContext* DX11AllocationItem::get_device_ctx_ptr() { |
||||
return shared_device_context;//shared_device_context.get();
|
||||
} |
||||
|
||||
void DX11AllocationItem::on_first_in_impl(mfxFrameData *ptr) { |
||||
D3D11_MAP mapType = D3D11_MAP_READ; |
||||
UINT mapFlags = D3D11_MAP_FLAG_DO_NOT_WAIT; |
||||
|
||||
shared_device_context->CopySubresourceRegion(get_staging_texture_ptr(), 0, |
||||
0, 0, 0, |
||||
get_texture_ptr(), |
||||
get_subresource(), |
||||
nullptr); |
||||
HRESULT err = S_OK; |
||||
D3D11_MAPPED_SUBRESOURCE lockedRect {}; |
||||
do { |
||||
err = shared_device_context->Map(get_staging_texture_ptr(), 0, mapType, mapFlags, &lockedRect); |
||||
if (S_OK != err && DXGI_ERROR_WAS_STILL_DRAWING != err) { |
||||
GAPI_LOG_WARNING(nullptr, "Cannot Map staging texture in device context, error: " << std::to_string(HRESULT_CODE(err))); |
||||
GAPI_Assert(false && "Cannot Map staging texture in device context"); |
||||
} |
||||
} while (DXGI_ERROR_WAS_STILL_DRAWING == err); |
||||
|
||||
if (FAILED(err)) { |
||||
GAPI_LOG_WARNING(nullptr, "Cannot lock frame"); |
||||
GAPI_Assert(false && "Cannot lock frame"); |
||||
return ; |
||||
} |
||||
|
||||
D3D11_TEXTURE2D_DESC desc {}; |
||||
get_texture_ptr()->GetDesc(&desc); |
||||
switch (desc.Format) { |
||||
case DXGI_FORMAT_NV12: |
||||
ptr->Pitch = (mfxU16)lockedRect.RowPitch; |
||||
ptr->Y = (mfxU8 *)lockedRect.pData; |
||||
ptr->UV = (mfxU8 *)lockedRect.pData + desc.Height * lockedRect.RowPitch; |
||||
|
||||
GAPI_Assert(ptr->Y && ptr->UV && "DXGI_FORMAT_NV12 locked frame data is nullptr"); |
||||
break; |
||||
default: |
||||
GAPI_LOG_WARNING(nullptr, "Unknown DXGI format: " << desc.Format); |
||||
return; |
||||
} |
||||
} |
||||
|
||||
void DX11AllocationItem::on_last_out_impl(mfxFrameData *ptr) { |
||||
shared_device_context->Unmap(get_staging_texture_ptr(), 0); |
||||
if (ptr) { |
||||
ptr->Pitch = 0; |
||||
ptr->U = ptr->V = ptr->Y = 0; |
||||
ptr->A = ptr->R = ptr->G = ptr->B = 0; |
||||
} |
||||
} |
||||
|
||||
mfxStatus DX11AllocationItem::acquire_access(mfxFrameData *ptr) { |
||||
if (is_write_acquired()) { |
||||
return exclusive_access_acquire_unsafe(ptr); |
||||
} |
||||
return shared_access_acquire_unsafe(ptr); |
||||
} |
||||
|
||||
mfxStatus DX11AllocationItem::release_access(mfxFrameData *ptr) { |
||||
if (is_write_acquired()) { |
||||
return exclusive_access_release_unsafe(ptr); |
||||
} |
||||
return shared_access_release_unsafe(ptr); |
||||
} |
||||
|
||||
mfxStatus DX11AllocationItem::shared_access_acquire_unsafe(mfxFrameData *ptr) { |
||||
GAPI_LOG_DEBUG(nullptr, "acquire READ lock: " << this); |
||||
GAPI_LOG_DEBUG(nullptr, "texture: " << get_texture_ptr() << |
||||
", sub id: " << get_subresource()); |
||||
// shared access requires elastic barrier
|
||||
// first-in visited thread uses resource mapping on host memory
|
||||
// subsequent threads reuses mapped memory
|
||||
//
|
||||
// exclusive access is prohibited while any one shared access has been obtained
|
||||
visit_in(ptr); |
||||
|
||||
if (!(ptr->Y && (ptr->UV || (ptr->U && ptr->V)))) { |
||||
GAPI_LOG_WARNING(nullptr, "No any data obtained: " << this); |
||||
return MFX_ERR_LOCK_MEMORY; |
||||
} |
||||
GAPI_LOG_DEBUG(nullptr, "READ access granted: " << this); |
||||
return MFX_ERR_NONE; |
||||
} |
||||
|
||||
mfxStatus DX11AllocationItem::shared_access_release_unsafe(mfxFrameData *ptr) { |
||||
GAPI_LOG_DEBUG(nullptr, "releasing READ lock: " << this); |
||||
GAPI_LOG_DEBUG(nullptr, "texture: " << get_texture_ptr() << |
||||
", sub id: " << get_subresource()); |
||||
// releasing shared access requires elastic barrier
|
||||
// last-out thread must make memory unmapping then and only then no more
|
||||
// read access is coming. If another read-access goes into critical section
|
||||
// (or waiting for acees) we must drop off unmapping procedure
|
||||
visit_out(ptr); |
||||
|
||||
GAPI_LOG_DEBUG(nullptr, "access on READ released: " << this); |
||||
return MFX_ERR_NONE; |
||||
} |
||||
|
||||
mfxStatus DX11AllocationItem::exclusive_access_acquire_unsafe(mfxFrameData *ptr) { |
||||
GAPI_LOG_DEBUG(nullptr, "acquire WRITE lock: " << this); |
||||
GAPI_LOG_DEBUG(nullptr, "texture: " << get_texture_ptr() << |
||||
", sub id: " << get_subresource()); |
||||
D3D11_MAP mapType = D3D11_MAP_WRITE; |
||||
UINT mapFlags = D3D11_MAP_FLAG_DO_NOT_WAIT; |
||||
|
||||
HRESULT err = S_OK; |
||||
D3D11_MAPPED_SUBRESOURCE lockedRect {}; |
||||
do { |
||||
err = get_device_ctx_ptr()->Map(get_staging_texture_ptr(), 0, mapType, mapFlags, &lockedRect); |
||||
if (S_OK != err && DXGI_ERROR_WAS_STILL_DRAWING != err) { |
||||
GAPI_LOG_WARNING(nullptr, "Cannot Map staging texture in device context, error: " << std::to_string(HRESULT_CODE(err))); |
||||
return MFX_ERR_LOCK_MEMORY; |
||||
} |
||||
} while (DXGI_ERROR_WAS_STILL_DRAWING == err); |
||||
|
||||
if (FAILED(err)) { |
||||
GAPI_LOG_WARNING(nullptr, "Cannot lock frame"); |
||||
return MFX_ERR_LOCK_MEMORY; |
||||
} |
||||
|
||||
D3D11_TEXTURE2D_DESC desc {}; |
||||
get_texture_ptr()->GetDesc(&desc); |
||||
switch (desc.Format) { |
||||
case DXGI_FORMAT_NV12: |
||||
ptr->Pitch = (mfxU16)lockedRect.RowPitch; |
||||
ptr->Y = (mfxU8 *)lockedRect.pData; |
||||
ptr->UV = (mfxU8 *)lockedRect.pData + desc.Height * lockedRect.RowPitch; |
||||
if (!ptr->Y || !ptr->UV) { |
||||
GAPI_LOG_WARNING(nullptr, "DXGI_FORMAT_NV12 locked frame data is nullptr"); |
||||
return MFX_ERR_LOCK_MEMORY; |
||||
} |
||||
break; |
||||
default: |
||||
GAPI_LOG_WARNING(nullptr, "Unknown DXGI format: " << desc.Format); |
||||
return MFX_ERR_LOCK_MEMORY; |
||||
} |
||||
|
||||
GAPI_LOG_DEBUG(nullptr, "WRITE access granted: " << this); |
||||
return MFX_ERR_NONE; |
||||
} |
||||
|
||||
mfxStatus DX11AllocationItem::exclusive_access_release_unsafe(mfxFrameData *ptr) { |
||||
GAPI_LOG_DEBUG(nullptr, "releasing WRITE lock: " << this); |
||||
GAPI_LOG_DEBUG(nullptr, "texture: " << get_texture_ptr() << |
||||
", sub id: " << get_subresource()); |
||||
|
||||
get_device_ctx_ptr()->Unmap(get_staging_texture_ptr(), 0); |
||||
|
||||
get_device_ctx_ptr()->CopySubresourceRegion(get_texture_ptr(), |
||||
get_subresource(), |
||||
0, 0, 0, |
||||
get_staging_texture_ptr(), 0, |
||||
nullptr); |
||||
|
||||
if (ptr) { |
||||
ptr->Pitch = 0; |
||||
ptr->U = ptr->V = ptr->Y = 0; |
||||
ptr->A = ptr->R = ptr->G = ptr->B = 0; |
||||
} |
||||
GAPI_LOG_DEBUG(nullptr, "access on WRITE released: " << this); |
||||
return MFX_ERR_NONE; |
||||
} |
||||
|
||||
DX11AllocationRecord::DX11AllocationRecord() = default; |
||||
|
||||
DX11AllocationRecord::~DX11AllocationRecord() { |
||||
GAPI_LOG_DEBUG(nullptr, "record: " << this << |
||||
", subresources count: " << resources.size()); |
||||
|
||||
for (AllocationId id : resources) { |
||||
delete id; |
||||
} |
||||
resources.clear(); |
||||
|
||||
GAPI_LOG_DEBUG(nullptr, "release final referenced texture: " << texture_ptr.get()); |
||||
} |
||||
|
||||
void DX11AllocationRecord::init(unsigned int items, |
||||
ID3D11DeviceContext* origin_ctx, |
||||
mfxFrameAllocator origin_allocator, |
||||
ComPtrGuard<ID3D11Texture2D>&& texture, |
||||
std::vector<ComPtrGuard<ID3D11Texture2D>> &&staging_textures) { |
||||
GAPI_DbgAssert(items != 0 && "Cannot create DX11AllocationRecord with empty items"); |
||||
GAPI_DbgAssert(items == staging_textures.size() && "Allocation items count and staging size are not equal"); |
||||
GAPI_DbgAssert(origin_ctx && |
||||
"Cannot create DX11AllocationItem for empty origin_ctx"); |
||||
auto shared_allocator_copy = origin_allocator; |
||||
GAPI_DbgAssert((shared_allocator_copy.Lock && shared_allocator_copy.Unlock) && |
||||
"Cannot create DX11AllocationItem for empty origin allocator"); |
||||
|
||||
// abandon unusable c-allocator interfaces
|
||||
shared_allocator_copy.Alloc = nullptr; |
||||
shared_allocator_copy.Free = nullptr; |
||||
shared_allocator_copy.pthis = nullptr; |
||||
|
||||
|
||||
GAPI_LOG_DEBUG(nullptr, "subresources count: " << items << ", text: " << texture.get()); |
||||
resources.reserve(items); |
||||
// no AddRef here, because DX11AllocationRecord receive ownership it here
|
||||
texture_ptr = createCOMSharedPtrGuard(std::move(texture)); |
||||
for(unsigned int i = 0; i < items; i++) { |
||||
resources.emplace_back(new DX11AllocationItem(get_ptr(), origin_ctx, shared_allocator_copy, |
||||
texture_ptr, i, std::move(staging_textures[i]))); |
||||
} |
||||
} |
||||
|
||||
DX11AllocationRecord::Ptr DX11AllocationRecord::get_ptr() { |
||||
return shared_from_this(); |
||||
} |
||||
|
||||
DX11AllocationRecord::AllocationId* DX11AllocationRecord::data() { |
||||
return resources.data(); |
||||
} |
||||
|
||||
size_t DX11AllocationRecord::size() const { |
||||
return resources.size(); |
||||
} |
||||
} // namespace onevpl
|
||||
} // namespace wip
|
||||
} // namespace gapi
|
||||
} // namespace cv
|
||||
#endif // HAVE_D3D11
|
||||
#endif // HAVE_DIRECTX
|
||||
#endif // HAVE_ONEVPL
|
@ -0,0 +1,151 @@ |
||||
#ifndef GAPI_STREAMING_ONEVPL_ACCEL_DX11_ALLOC_RESOURCE_HPP |
||||
#define GAPI_STREAMING_ONEVPL_ACCEL_DX11_ALLOC_RESOURCE_HPP |
||||
|
||||
#include <map> |
||||
|
||||
#include "opencv2/gapi/own/exports.hpp" // GAPI_EXPORTS |
||||
#include <opencv2/gapi/util/compiler_hints.hpp> |
||||
|
||||
#ifdef HAVE_ONEVPL |
||||
#include "streaming/onevpl/onevpl_export.hpp" |
||||
#include "streaming/onevpl/accelerators/utils/elastic_barrier.hpp" |
||||
#include "streaming/onevpl/utils.hpp" |
||||
|
||||
#ifdef HAVE_DIRECTX |
||||
#ifdef HAVE_D3D11 |
||||
#pragma comment(lib,"d3d11.lib") |
||||
|
||||
#define D3D11_NO_HELPERS |
||||
#define NOMINMAX |
||||
#include <d3d11.h> |
||||
#include <d3d11_4.h> |
||||
#include <codecvt> |
||||
#include "opencv2/core/directx.hpp" |
||||
#ifdef HAVE_OPENCL |
||||
#include <CL/cl_d3d11.h> |
||||
#endif // HAVE_OPENCL
|
||||
#undef D3D11_NO_HELPERS |
||||
#undef NOMINMAX |
||||
|
||||
namespace cv { |
||||
namespace gapi { |
||||
namespace wip { |
||||
namespace onevpl { |
||||
|
||||
class SharedLock; |
||||
// GAPI_EXPORTS for tests
|
||||
struct GAPI_EXPORTS LockAdapter { |
||||
LockAdapter(mfxFrameAllocator origin_allocator); |
||||
|
||||
size_t read_lock(mfxMemId mid, mfxFrameData &data); |
||||
size_t unlock_read(mfxMemId mid, mfxFrameData &data); |
||||
|
||||
void write_lock(mfxMemId mid, mfxFrameData &data); |
||||
bool is_write_acquired(); |
||||
void unlock_write(mfxMemId mid, mfxFrameData &data); |
||||
|
||||
SharedLock* set_adaptee(SharedLock* new_impl); |
||||
SharedLock* get_adaptee(); |
||||
private: |
||||
LockAdapter(const LockAdapter&) = delete; |
||||
LockAdapter(LockAdapter&&) = delete; |
||||
LockAdapter& operator= (const LockAdapter&) = delete; |
||||
LockAdapter& operator= (LockAdapter&&) = delete; |
||||
|
||||
mfxFrameAllocator lockable_allocator; |
||||
SharedLock* impl; |
||||
}; |
||||
|
||||
struct GAPI_EXPORTS NativeHandleAdapter { |
||||
NativeHandleAdapter(mfxFrameAllocator origin_allocator); |
||||
|
||||
void get_handle(mfxMemId mid, mfxHDL& out); |
||||
private: |
||||
mfxFrameAllocator native_handle_getter; |
||||
}; |
||||
|
||||
struct DX11AllocationRecord; |
||||
struct DX11AllocationItem : public LockAdapter, |
||||
public NativeHandleAdapter, |
||||
public elastic_barrier<DX11AllocationItem> { |
||||
using subresource_id_t = unsigned int; |
||||
|
||||
friend struct DX11AllocationRecord; |
||||
friend class elastic_barrier<DX11AllocationItem>; |
||||
~DX11AllocationItem(); |
||||
|
||||
void release(); |
||||
ID3D11Texture2D* get_texture_ptr(); |
||||
ID3D11Texture2D* get_staging_texture_ptr(); |
||||
DX11AllocationItem::subresource_id_t get_subresource() const; |
||||
|
||||
ID3D11DeviceContext* get_device_ctx_ptr(); |
||||
|
||||
// public transactional access to resources.
|
||||
// implements dispatching through different access acquisition modes.
|
||||
// current acquisition mode determined by `LockAdapter` with `is_write_acquired()`
|
||||
mfxStatus acquire_access(mfxFrameData *ptr); |
||||
mfxStatus release_access(mfxFrameData *ptr); |
||||
private: |
||||
DX11AllocationItem(std::weak_ptr<DX11AllocationRecord> parent, |
||||
ID3D11DeviceContext* origin_ctx, |
||||
mfxFrameAllocator origin_allocator, |
||||
ComSharedPtrGuard<ID3D11Texture2D> texture_ptr, |
||||
subresource_id_t subresource_id, |
||||
ComPtrGuard<ID3D11Texture2D>&& staging_tex_ptr); |
||||
|
||||
// elastic barrier interface impl
|
||||
void on_first_in_impl(mfxFrameData *ptr); |
||||
void on_last_out_impl(mfxFrameData *ptr); |
||||
|
||||
mfxStatus shared_access_acquire_unsafe(mfxFrameData *ptr); |
||||
mfxStatus shared_access_release_unsafe(mfxFrameData *ptr); |
||||
mfxStatus exclusive_access_acquire_unsafe(mfxFrameData *ptr); |
||||
mfxStatus exclusive_access_release_unsafe(mfxFrameData *ptr); |
||||
|
||||
ID3D11DeviceContext* shared_device_context; |
||||
|
||||
ComSharedPtrGuard<ID3D11Texture2D> texture_ptr; |
||||
subresource_id_t subresource_id = 0; |
||||
ComPtrGuard<ID3D11Texture2D> staging_texture_ptr; |
||||
std::weak_ptr<DX11AllocationRecord> observer; |
||||
}; |
||||
|
||||
struct DX11AllocationRecord : public std::enable_shared_from_this<DX11AllocationRecord> { |
||||
|
||||
using Ptr = std::shared_ptr<DX11AllocationRecord>; |
||||
|
||||
~DX11AllocationRecord(); |
||||
|
||||
template<typename... Args> |
||||
static Ptr create(Args&& ...args) { |
||||
std::shared_ptr<DX11AllocationRecord> record(new DX11AllocationRecord); |
||||
record->init(std::forward<Args>(args)...); |
||||
return record; |
||||
} |
||||
|
||||
Ptr get_ptr(); |
||||
|
||||
// Raw ptr is required as a part of VPL `Mid` c-interface
|
||||
// which requires contiguous memory
|
||||
using AllocationId = DX11AllocationItem*; |
||||
AllocationId* data(); |
||||
size_t size() const; |
||||
private: |
||||
DX11AllocationRecord(); |
||||
void init(unsigned int items, ID3D11DeviceContext* origin_ctx, |
||||
mfxFrameAllocator origin_allocator, |
||||
ComPtrGuard<ID3D11Texture2D>&& texture, std::vector<ComPtrGuard<ID3D11Texture2D>> &&staging_textures); |
||||
|
||||
std::vector<AllocationId> resources; |
||||
ComSharedPtrGuard<ID3D11Texture2D> texture_ptr; |
||||
}; |
||||
|
||||
} // namespace onevpl
|
||||
} // namespace wip
|
||||
} // namespace gapi
|
||||
} // namespace cv
|
||||
#endif // HAVE_D3D11
|
||||
#endif // HAVE_DIRECTX
|
||||
#endif // HAVE_ONEVPL
|
||||
#endif // GAPI_STREAMING_ONEVPL_ACCEL_DX11_ALLOC_RESOURCE_HPP
|
@ -0,0 +1,232 @@ |
||||
// This file is part of OpenCV project.
|
||||
// It is subject to the license terms in the LICENSE file found in the top-level directory
|
||||
// of this distribution and at http://opencv.org/license.html.
|
||||
//
|
||||
// Copyright (C) 2021 Intel Corporation
|
||||
|
||||
#include "streaming/onevpl/accelerators/surface/dx11_frame_adapter.hpp" |
||||
#include "streaming/onevpl/accelerators/dx11_alloc_resource.hpp" |
||||
#include "streaming/onevpl/accelerators/surface/surface.hpp" |
||||
#include "logger.hpp" |
||||
|
||||
#ifdef HAVE_ONEVPL |
||||
#include "streaming/onevpl/onevpl_export.hpp" |
||||
|
||||
#ifdef HAVE_INF_ENGINE |
||||
// For IE classes (ParamMap, etc)
|
||||
#include <inference_engine.hpp> |
||||
#endif // HAVE_INF_ENGINE
|
||||
|
||||
namespace cv { |
||||
namespace gapi { |
||||
namespace wip { |
||||
namespace onevpl { |
||||
|
||||
void lock_mid(mfxMemId mid, mfxFrameData &data, MediaFrame::Access mode) { |
||||
LockAdapter* alloc_data = reinterpret_cast<LockAdapter *>(mid); |
||||
if (mode == MediaFrame::Access::R) { |
||||
alloc_data->read_lock(mid, data); |
||||
} else { |
||||
alloc_data->write_lock(mid, data); |
||||
} |
||||
} |
||||
|
||||
void unlock_mid(mfxMemId mid, mfxFrameData &data, MediaFrame::Access mode) { |
||||
LockAdapter* alloc_data = reinterpret_cast<LockAdapter*>(data.MemId); |
||||
if (mode == MediaFrame::Access::R) { |
||||
alloc_data->unlock_read(mid, data); |
||||
} else { |
||||
alloc_data->unlock_write(mid, data); |
||||
} |
||||
} |
||||
|
||||
VPLMediaFrameDX11Adapter::VPLMediaFrameDX11Adapter(std::shared_ptr<Surface> surface): |
||||
parent_surface_ptr(surface) { |
||||
GAPI_Assert(parent_surface_ptr && "Surface is nullptr"); |
||||
|
||||
const Surface::info_t& info = parent_surface_ptr->get_info(); |
||||
Surface::data_t& data = parent_surface_ptr->get_data(); |
||||
GAPI_LOG_DEBUG(nullptr, "surface: " << parent_surface_ptr->get_handle() << |
||||
", w: " << info.Width << ", h: " << info.Height << |
||||
", p: " << data.Pitch); |
||||
switch(info.FourCC) |
||||
{ |
||||
case MFX_FOURCC_I420: |
||||
throw std::runtime_error("MediaFrame doesn't support I420 type"); |
||||
break; |
||||
case MFX_FOURCC_NV12: |
||||
frame_desc.fmt = MediaFormat::NV12; |
||||
break; |
||||
default: |
||||
throw std::runtime_error("MediaFrame unknown 'fmt' type: " + std::to_string(info.FourCC)); |
||||
} |
||||
frame_desc.size = cv::Size{info.Width, info.Height}; |
||||
|
||||
LockAdapter* alloc_data = reinterpret_cast<LockAdapter*>(data.MemId); |
||||
alloc_data->set_adaptee(this); |
||||
|
||||
parent_surface_ptr->obtain_lock(); |
||||
} |
||||
|
||||
VPLMediaFrameDX11Adapter::~VPLMediaFrameDX11Adapter() { |
||||
// Each VPLMediaFrameDX11Adapter releases mfx surface counter
|
||||
// The last VPLMediaFrameDX11Adapter releases shared Surface pointer
|
||||
// The last surface pointer releases workspace memory
|
||||
Surface::data_t& data = parent_surface_ptr->get_data(); |
||||
LockAdapter* alloc_data = reinterpret_cast<LockAdapter*>(data.MemId); |
||||
alloc_data->set_adaptee(nullptr); |
||||
|
||||
parent_surface_ptr->release_lock(); |
||||
} |
||||
|
||||
cv::GFrameDesc VPLMediaFrameDX11Adapter::meta() const { |
||||
return frame_desc; |
||||
} |
||||
|
||||
MediaFrame::View VPLMediaFrameDX11Adapter::access(MediaFrame::Access mode) { |
||||
Surface::data_t& data = parent_surface_ptr->get_data(); |
||||
const Surface::info_t& info = parent_surface_ptr->get_info(); |
||||
void* frame_id = reinterpret_cast<void*>(this); |
||||
|
||||
GAPI_LOG_DEBUG(nullptr, "START lock frame in surface: " << parent_surface_ptr->get_handle() << |
||||
", frame id: " << frame_id); |
||||
|
||||
// lock MT
|
||||
lock_mid(data.MemId, data, mode); |
||||
|
||||
GAPI_LOG_DEBUG(nullptr, "FINISH lock frame in surface: " << parent_surface_ptr->get_handle() << |
||||
", frame id: " << frame_id); |
||||
using stride_t = typename cv::MediaFrame::View::Strides::value_type; |
||||
stride_t pitch = static_cast<stride_t>(data.Pitch); |
||||
|
||||
// NB: make copy for some copyable object, because access release may be happened
|
||||
// after source/pool destruction, so we need a copy
|
||||
auto parent_surface_ptr_copy = parent_surface_ptr; |
||||
switch(info.FourCC) { |
||||
case MFX_FOURCC_I420: |
||||
{ |
||||
GAPI_Assert(data.Y && data.U && data.V && "MFX_FOURCC_I420 frame data is nullptr"); |
||||
cv::MediaFrame::View::Ptrs pp = { data.Y, data.U, data.V, nullptr }; |
||||
cv::MediaFrame::View::Strides ss = { pitch, pitch / 2, pitch / 2, 0u }; |
||||
return cv::MediaFrame::View(std::move(pp), std::move(ss), |
||||
[parent_surface_ptr_copy, |
||||
frame_id, mode] () { |
||||
parent_surface_ptr_copy->obtain_lock(); |
||||
|
||||
auto& data = parent_surface_ptr_copy->get_data(); |
||||
GAPI_LOG_DEBUG(nullptr, "START unlock frame in surface: " << parent_surface_ptr_copy->get_handle() << |
||||
", frame id: " << frame_id); |
||||
unlock_mid(data.MemId, data, mode); |
||||
|
||||
GAPI_LOG_DEBUG(nullptr, "FINISH unlock frame in surface: " << parent_surface_ptr_copy->get_handle() << |
||||
", frame id: " << frame_id); |
||||
|
||||
parent_surface_ptr_copy->release_lock(); |
||||
}); |
||||
} |
||||
case MFX_FOURCC_NV12: |
||||
{ |
||||
if (!data.Y || !data.UV) { |
||||
GAPI_LOG_WARNING(nullptr, "Empty data detected!!! for surface: " << parent_surface_ptr->get_handle() << |
||||
", frame id: " << frame_id); |
||||
} |
||||
GAPI_Assert(data.Y && data.UV && "MFX_FOURCC_NV12 frame data is nullptr"); |
||||
cv::MediaFrame::View::Ptrs pp = { data.Y, data.UV, nullptr, nullptr }; |
||||
cv::MediaFrame::View::Strides ss = { pitch, pitch, 0u, 0u }; |
||||
return cv::MediaFrame::View(std::move(pp), std::move(ss), |
||||
[parent_surface_ptr_copy, |
||||
frame_id, mode] () { |
||||
parent_surface_ptr_copy->obtain_lock(); |
||||
|
||||
auto& data = parent_surface_ptr_copy->get_data(); |
||||
GAPI_LOG_DEBUG(nullptr, "START unlock frame in surface: " << parent_surface_ptr_copy->get_handle() << |
||||
", frame id: " << frame_id); |
||||
unlock_mid(data.MemId, data, mode); |
||||
|
||||
GAPI_LOG_DEBUG(nullptr, "FINISH unlock frame in surface: " << parent_surface_ptr_copy->get_handle() << |
||||
", frame id: " << frame_id); |
||||
parent_surface_ptr_copy->release_lock(); |
||||
}); |
||||
} |
||||
break; |
||||
default: |
||||
throw std::runtime_error("MediaFrame unknown 'fmt' type: " + std::to_string(info.FourCC)); |
||||
} |
||||
} |
||||
|
||||
cv::util::any VPLMediaFrameDX11Adapter::blobParams() const { |
||||
#ifdef HAVE_INF_ENGINE |
||||
GAPI_Assert(false && "VPLMediaFrameDX11Adapter::blobParams() is not fully operable " |
||||
"in G-API streaming. Please waiting for future PRs"); |
||||
|
||||
Surface::data_t& data = parent_surface_ptr->get_data(); |
||||
NativeHandleAdapter* native_handle_getter = reinterpret_cast<NativeHandleAdapter*>(data.MemId); |
||||
|
||||
mfxHDLPair handle{}; |
||||
native_handle_getter->get_handle(data.MemId, reinterpret_cast<mfxHDL&>(handle)); |
||||
|
||||
InferenceEngine::ParamMap params{{"SHARED_MEM_TYPE", "VA_SURFACE"}, |
||||
{"DEV_OBJECT_HANDLE", handle.first}, |
||||
{"COLOR_FORMAT", InferenceEngine::ColorFormat::NV12}, |
||||
{"VA_PLANE", |
||||
static_cast<DX11AllocationItem::subresource_id_t>( |
||||
reinterpret_cast<uint64_t>( |
||||
reinterpret_cast<DX11AllocationItem::subresource_id_t *>( |
||||
handle.second)))}};//,
|
||||
const Surface::info_t& info = parent_surface_ptr->get_info(); |
||||
InferenceEngine::TensorDesc tdesc({InferenceEngine::Precision::U8, |
||||
{1, 3, static_cast<size_t>(info.Height), |
||||
static_cast<size_t>(info.Width)}, |
||||
InferenceEngine::Layout::NCHW}); |
||||
return std::make_pair(tdesc, params); |
||||
#else |
||||
GAPI_Assert(false && "VPLMediaFrameDX11Adapter::blobParams() is not implemented"); |
||||
#endif // HAVE_INF_ENGINE
|
||||
} |
||||
|
||||
void VPLMediaFrameDX11Adapter::serialize(cv::gapi::s11n::IOStream&) { |
||||
GAPI_Assert(false && "VPLMediaFrameDX11Adapter::serialize() is not implemented"); |
||||
} |
||||
|
||||
void VPLMediaFrameDX11Adapter::deserialize(cv::gapi::s11n::IIStream&) { |
||||
GAPI_Assert(false && "VPLMediaFrameDX11Adapter::deserialize() is not implemented"); |
||||
} |
||||
|
||||
DXGI_FORMAT VPLMediaFrameDX11Adapter::get_dx11_color_format(uint32_t mfx_fourcc) { |
||||
switch (mfx_fourcc) { |
||||
case MFX_FOURCC_NV12: |
||||
return DXGI_FORMAT_NV12; |
||||
|
||||
case MFX_FOURCC_YUY2: |
||||
return DXGI_FORMAT_YUY2; |
||||
|
||||
case MFX_FOURCC_RGB4: |
||||
return DXGI_FORMAT_B8G8R8A8_UNORM; |
||||
|
||||
case MFX_FOURCC_P8: |
||||
case MFX_FOURCC_P8_TEXTURE: |
||||
return DXGI_FORMAT_P8; |
||||
|
||||
case MFX_FOURCC_ARGB16: |
||||
case MFX_FOURCC_ABGR16: |
||||
return DXGI_FORMAT_R16G16B16A16_UNORM; |
||||
|
||||
case MFX_FOURCC_P010: |
||||
return DXGI_FORMAT_P010; |
||||
|
||||
case MFX_FOURCC_A2RGB10: |
||||
return DXGI_FORMAT_R10G10B10A2_UNORM; |
||||
|
||||
case DXGI_FORMAT_AYUV: |
||||
case MFX_FOURCC_AYUV: |
||||
return DXGI_FORMAT_AYUV; |
||||
|
||||
default: |
||||
return DXGI_FORMAT_UNKNOWN; |
||||
} |
||||
} |
||||
} // namespace onevpl
|
||||
} // namespace wip
|
||||
} // namespace gapi
|
||||
} // namespace cv
|
||||
#endif // HAVE_ONEVPL
|
@ -0,0 +1,63 @@ |
||||
// This file is part of OpenCV project.
|
||||
// It is subject to the license terms in the LICENSE file found in the top-level directory
|
||||
// of this distribution and at http://opencv.org/license.html.
|
||||
//
|
||||
// Copyright (C) 2021 Intel Corporation
|
||||
|
||||
#ifndef GAPI_STREAMING_ONEVPL_ACCELERATORS_SURFACE_DX11_FRAME_ADAPTER_HPP |
||||
#define GAPI_STREAMING_ONEVPL_ACCELERATORS_SURFACE_DX11_FRAME_ADAPTER_HPP |
||||
#include <memory> |
||||
|
||||
#include <opencv2/gapi/media.hpp> |
||||
#include "opencv2/gapi/own/exports.hpp" // GAPI_EXPORTS |
||||
|
||||
#include "streaming/onevpl/accelerators/utils/shared_lock.hpp" |
||||
#ifdef HAVE_ONEVPL |
||||
#include "streaming/onevpl/onevpl_export.hpp" |
||||
|
||||
#ifdef HAVE_DIRECTX |
||||
#ifdef HAVE_D3D11 |
||||
#define D3D11_NO_HELPERS |
||||
#define NOMINMAX |
||||
#include <d3d11.h> |
||||
#include <codecvt> |
||||
#include "opencv2/core/directx.hpp" |
||||
#ifdef HAVE_OPENCL |
||||
#include <CL/cl_d3d11.h> |
||||
#endif |
||||
|
||||
namespace cv { |
||||
namespace gapi { |
||||
namespace wip { |
||||
namespace onevpl { |
||||
|
||||
class Surface; |
||||
class VPLMediaFrameDX11Adapter final: public cv::MediaFrame::IAdapter, |
||||
public SharedLock { |
||||
public: |
||||
// GAPI_EXPORTS for tests
|
||||
GAPI_EXPORTS VPLMediaFrameDX11Adapter(std::shared_ptr<Surface> assoc_surface); |
||||
GAPI_EXPORTS ~VPLMediaFrameDX11Adapter(); |
||||
cv::GFrameDesc meta() const override; |
||||
MediaFrame::View access(MediaFrame::Access) override; |
||||
|
||||
// The default implementation does nothing
|
||||
cv::util::any blobParams() const override; |
||||
void serialize(cv::gapi::s11n::IOStream&) override; |
||||
void deserialize(cv::gapi::s11n::IIStream&) override; |
||||
|
||||
static DXGI_FORMAT get_dx11_color_format(uint32_t mfx_fourcc); |
||||
private: |
||||
std::shared_ptr<Surface> parent_surface_ptr; |
||||
mfxFrameAllocator allocator; |
||||
GFrameDesc frame_desc; |
||||
}; |
||||
} // namespace onevpl
|
||||
} // namespace wip
|
||||
} // namespace gapi
|
||||
} // namespace cv
|
||||
#undef NOMINMAX |
||||
#endif // HAVE_D3D11
|
||||
#endif // HAVE_DIRECTX
|
||||
#endif // HAVE_ONEVPL
|
||||
#endif // GAPI_STREAMING_ONEVPL_ACCELERATORS_SURFACE_DX11_FRAME_ADAPTER_HPP
|
@ -0,0 +1,296 @@ |
||||
// This file is part of OpenCV project.
|
||||
// It is subject to the license terms in the LICENSE file found in the top-level directory
|
||||
// of this distribution and at http://opencv.org/license.html.
|
||||
//
|
||||
// Copyright (C) 2021 Intel Corporation
|
||||
|
||||
#ifndef GAPI_STREAMING_ONEVPL_ACCELERATORS_UTILS_ELASTIC_BARRIER_HPP |
||||
#define GAPI_STREAMING_ONEVPL_ACCELERATORS_UTILS_ELASTIC_BARRIER_HPP |
||||
#include <atomic> |
||||
|
||||
namespace cv { |
||||
namespace gapi { |
||||
namespace wip { |
||||
namespace onevpl { |
||||
|
||||
template<typename Impl> |
||||
class elastic_barrier { |
||||
public: |
||||
using self_t = Impl; |
||||
elastic_barrier() : |
||||
incoming_requests(), |
||||
outgoing_requests(), |
||||
pending_requests(), |
||||
reinit(false) { |
||||
} |
||||
|
||||
self_t* get_self() { |
||||
return static_cast<self_t*>(this); |
||||
} |
||||
|
||||
template<typename ...Args> |
||||
void visit_in (Args&& ...args) { |
||||
on_lock(std::forward<Args>(args)...); |
||||
} |
||||
|
||||
template<typename ...Args> |
||||
void visit_out (Args&& ...args) { |
||||
on_unlock(std::forward<Args>(args)...); |
||||
} |
||||
|
||||
protected: |
||||
~elastic_barrier() = default; |
||||
|
||||
private: |
||||
std::atomic<size_t> incoming_requests; |
||||
std::atomic<size_t> outgoing_requests; |
||||
std::atomic<size_t> pending_requests; |
||||
std::atomic<bool> reinit; |
||||
|
||||
template<typename ...Args> |
||||
void on_first_in(Args&& ...args) { |
||||
get_self()->on_first_in_impl(std::forward<Args>(args)...); |
||||
} |
||||
|
||||
template<typename ...Args> |
||||
void on_last_out(Args&& ...args) { |
||||
get_self()->on_last_out_impl(std::forward<Args>(args)...); |
||||
} |
||||
|
||||
template<typename ...Args> |
||||
void on_lock(Args&& ...args) { |
||||
// Read access is more complex
|
||||
// each `incoming` request must check in before acquire resource
|
||||
size_t thread_id = incoming_requests.fetch_add(1); |
||||
if (thread_id == 0) { |
||||
/*
|
||||
* only one `incoming` request is allowable to init resource |
||||
* at first time |
||||
* let's filter out the first one by `thread_id` |
||||
* |
||||
* The first one `incoming` request becomes main `incoming` request |
||||
* */ |
||||
if (outgoing_requests.load() == 0) { |
||||
get_self()->on_first_in(std::forward<Args>(args)...); |
||||
/*
|
||||
* The main `incoming` request finished resource initialization |
||||
* and became `outgoing` |
||||
* |
||||
* Non empty `outgoing` count means that |
||||
* other further `incoming` (or busy-wait) requests |
||||
* are getting on with its job without resource initialization, |
||||
* because main `incoming` request has already initialized it at here |
||||
* */ |
||||
outgoing_requests.fetch_add(1); |
||||
return; |
||||
} |
||||
return; |
||||
} else { |
||||
/*
|
||||
* CASE 1) |
||||
* |
||||
* busy wait for others `incoming` requests for resource initialization |
||||
* besides main `incoming` request which are getting on |
||||
* resource initialization at this point |
||||
* |
||||
* */ |
||||
|
||||
// OR
|
||||
|
||||
/*
|
||||
* CASE 2) |
||||
* |
||||
* busy wait for ALL `incoming` request for resource initialization |
||||
* including main `incoming` request. It will happen if |
||||
* new `incoming` requests had came here while resource was getting on deinit |
||||
* in `on_unlock` in another processing thread. |
||||
* In this case no actual main `incoming` request is available and |
||||
* all `incoming` requests must be in busy-wait stare |
||||
* |
||||
* */ |
||||
|
||||
// Each `incoming` request became `busy-wait` request
|
||||
size_t busy_thread_id = pending_requests.fetch_add(1); |
||||
|
||||
/*
|
||||
* CASE 1) |
||||
* |
||||
* Non empty `outgoing` requests count means that other further `incoming` or |
||||
* `busy-wait` request are getting on with its job |
||||
* without resource initialization because |
||||
* main thread has already initialized it at here |
||||
* */ |
||||
while (outgoing_requests.load() == 0) { |
||||
|
||||
// OR
|
||||
|
||||
/*
|
||||
* CASE 2) |
||||
* |
||||
* In case of NO master `incoming `request is available and doesn't |
||||
* provide resource initialization. All `incoming` requests must be in |
||||
* busy-wait state. |
||||
* If it is not true then CASE 1) is going on |
||||
* |
||||
* OR |
||||
* |
||||
* `on_unlock` is in deinitialization phase in another thread. |
||||
* Both cases mean busy-wait state here |
||||
* */ |
||||
if (pending_requests.load() == incoming_requests.load()) { |
||||
/*
|
||||
* CASE 2) ONLY |
||||
* |
||||
* It will happen if 'on_unlock` in another thread |
||||
* finishes its execution only |
||||
* |
||||
* `on_unlock` in another thread might finished with either |
||||
* deinitialization action or without deinitialization action |
||||
* (the call off deinitialization case) |
||||
* |
||||
* We must not continue at here (without reinit) |
||||
* if deinitialization happens in `on_unlock` in another thread. |
||||
* So try it on |
||||
* */ |
||||
|
||||
// only single `busy-wait` request must make sure about possible
|
||||
// deinitialization. So first `busy-wait` request becomes
|
||||
// main `busy-wait` request
|
||||
if (busy_thread_id == 0) { |
||||
bool expected_reinit = true; |
||||
if (!reinit.compare_exchange_strong(expected_reinit, false)) { |
||||
/*
|
||||
* deinitialization called off in `on_unlock` |
||||
* because new `incoming` request had appeared at here before |
||||
* `on_unlock` started deinit procedure in another thread. |
||||
* So no reinit required because no deinit had happended |
||||
* |
||||
* main `busy-wait` request must break busy-wait state |
||||
* and become `outgoing` request. |
||||
* Non empty `outgoing` count means that other |
||||
* further `incoming` requests or |
||||
* `busy-wait` requests are getting on with its job |
||||
* without resource initialization/reinitialization |
||||
* because no deinit happened in `on_unlock` |
||||
* in another thread |
||||
* */ |
||||
break; //just quit busy loop
|
||||
} else { |
||||
/* Deinitialization had happened in `on_unlock`
|
||||
* in another thread right before |
||||
* new `incoming` requests appeared. |
||||
* So main `busy-wait` request must start reinit procedure |
||||
*/ |
||||
get_self()->on_first_in(std::forward<Args>(args)...); |
||||
|
||||
/*
|
||||
* Main `busy-wait` request has finished reinit procedure |
||||
* and becomes `outgong` request. |
||||
* Non empty `outgoing` count means that other |
||||
* further `incoming` requests or |
||||
* `busy-wait` requests are getting on with its job |
||||
* without resource initialization because |
||||
* main `busy-wait` request |
||||
* has already re-initialized it at here |
||||
*/ |
||||
outgoing_requests.fetch_add(1); |
||||
pending_requests.fetch_sub(1); |
||||
return; |
||||
} |
||||
} |
||||
} |
||||
} |
||||
|
||||
// All non main requests became `outgoing` and look at on initialized resource
|
||||
outgoing_requests++; |
||||
|
||||
// Each `busy-wait` request are not busy-wait now
|
||||
pending_requests.fetch_sub(1); |
||||
} |
||||
return; |
||||
} |
||||
|
||||
template<typename ...Args> |
||||
void on_unlock(Args&& ...args) { |
||||
// Read unlock
|
||||
/*
|
||||
* Each released `outgoing` request checks out to doesn't use resource anymore. |
||||
* The last `outgoing` request becomes main `outgoing` request and |
||||
* must deinitialize resource if no `incoming` or `busy-wait` requests |
||||
* are waiting for it |
||||
*/ |
||||
size_t thread_id = outgoing_requests.fetch_sub(1); |
||||
if (thread_id == 1) { |
||||
/*
|
||||
* Make sure that no another `incoming` (including `busy-wait) |
||||
* exists. |
||||
* But beforehand its must make sure that no `incoming` or `pending` |
||||
* requests are exist. |
||||
* |
||||
* The main `outgoing` request is an one of `incoming` request |
||||
* (it is the oldest one in the current `incoming` bunch) and still |
||||
* holds resource in initialized state (thus we compare with 1). |
||||
* We must not deinitialize resource before decrease |
||||
* `incoming` requests counter because |
||||
* after it has got 0 value in `on_lock` another thread |
||||
* will start initialize resource procedure which will get conflict |
||||
* with current deinitialize procedure |
||||
* |
||||
* From this point, all `on_lock` request in another thread would |
||||
* become `busy-wait` without reaching main `incoming` state (CASE 2) |
||||
* */ |
||||
if (incoming_requests.load() == 1) { |
||||
/*
|
||||
* The main `outgoing` request is ready to deinit shared resource |
||||
* in unconflicting manner. |
||||
* |
||||
* This is a critical section for single thread for main `outgoing` |
||||
* request |
||||
* |
||||
* CASE 2 only available in `on_lock` thread |
||||
* */ |
||||
get_self()->on_last_out(std::forward<Args>(args)...); |
||||
|
||||
/*
|
||||
* Before main `outgoinq` request become released it must notify |
||||
* subsequent `busy-wait` requests in `on_lock` in another thread |
||||
* that main `busy-wait` must start reinit resource procedure |
||||
* */ |
||||
reinit.store(true); |
||||
|
||||
/*
|
||||
* Deinitialize procedure is finished and main `outgoing` request |
||||
* (it is the oldest one in `incoming` request) must become released |
||||
* |
||||
* Right after when we decrease `incoming` counter |
||||
* the condition for equality |
||||
* `busy-wait` and `incoming` counter will become true (CASE 2 only) |
||||
* in `on_lock` in another threads. After that |
||||
* a main `busy-wait` request would check `reinit` condition |
||||
* */ |
||||
incoming_requests.fetch_sub(1); |
||||
return; |
||||
} |
||||
|
||||
/*
|
||||
* At this point we have guarantee that new `incoming` requests |
||||
* had became increased in `on_lock` in another thread right before |
||||
* current thread deinitialize resource. |
||||
* |
||||
* So call off deinitialization procedure here |
||||
* */ |
||||
} |
||||
incoming_requests.fetch_sub(1); |
||||
} |
||||
|
||||
elastic_barrier(const elastic_barrier&) = delete; |
||||
elastic_barrier(elastic_barrier&&) = delete; |
||||
elastic_barrier& operator() (const elastic_barrier&) = delete; |
||||
elastic_barrier& operator() (elastic_barrier&&) = delete; |
||||
}; |
||||
} // namespace onevpl
|
||||
} // namespace wip
|
||||
} // namespace gapi
|
||||
} // namespace cv
|
||||
|
||||
#endif // GAPI_STREAMING_ONEVPL_ACCELERATORS_UTILS_ELASTIC_BARRIER_HPP
|
@ -0,0 +1,95 @@ |
||||
// This file is part of OpenCV project.
|
||||
// It is subject to the license terms in the LICENSE file found in the top-level directory
|
||||
// of this distribution and at http://opencv.org/license.html.
|
||||
//
|
||||
// Copyright (C) 2021 Intel Corporation
|
||||
|
||||
#include <thread> |
||||
#include "streaming/onevpl/accelerators/utils/shared_lock.hpp" |
||||
|
||||
namespace cv { |
||||
namespace gapi { |
||||
namespace wip { |
||||
namespace onevpl { |
||||
|
||||
SharedLock::SharedLock() { |
||||
exclusive_lock.store(false); |
||||
shared_counter.store(0); |
||||
} |
||||
|
||||
size_t SharedLock::shared_lock() { |
||||
size_t prev = 0; |
||||
bool in_progress = false; |
||||
bool pred_excl = exclusive_lock.load(); |
||||
do { |
||||
if (!pred_excl) { |
||||
// if no exclusive lock then start shared lock transaction
|
||||
prev = shared_counter.fetch_add(1); |
||||
in_progress = true; // transaction is in progress
|
||||
} else { |
||||
if (in_progress) { |
||||
in_progress = false; |
||||
shared_counter.fetch_sub(1); |
||||
} |
||||
std::this_thread::yield(); |
||||
} |
||||
|
||||
// test if exclusive lock happened before
|
||||
pred_excl = exclusive_lock.load(); |
||||
} while (pred_excl || !in_progress); |
||||
|
||||
return prev; |
||||
} |
||||
|
||||
size_t SharedLock::unlock_shared() { |
||||
return shared_counter.fetch_sub(1); |
||||
} |
||||
|
||||
void SharedLock::lock() { |
||||
bool in_progress = false; |
||||
size_t prev_shared = shared_counter.load(); |
||||
do { |
||||
if (prev_shared == 0) { |
||||
bool expected = false; |
||||
while (!exclusive_lock.compare_exchange_strong(expected, true)) { |
||||
expected = false; |
||||
std::this_thread::yield(); |
||||
} |
||||
in_progress = true; |
||||
} else { |
||||
if (in_progress) { |
||||
in_progress = false; |
||||
exclusive_lock.store(false); |
||||
} |
||||
std::this_thread::yield(); |
||||
} |
||||
prev_shared = shared_counter.load(); |
||||
} while (prev_shared != 0 || !in_progress); |
||||
} |
||||
|
||||
bool SharedLock::try_lock() { |
||||
if (shared_counter.load() != 0) { |
||||
return false; |
||||
} |
||||
|
||||
bool expected = false; |
||||
if (exclusive_lock.compare_exchange_strong(expected, true)) { |
||||
if (shared_counter.load() == 0) { |
||||
return true; |
||||
} else { |
||||
exclusive_lock.store(false); |
||||
} |
||||
} |
||||
return false; |
||||
} |
||||
|
||||
void SharedLock::unlock() { |
||||
exclusive_lock.store(false); |
||||
} |
||||
bool SharedLock::owns() const { |
||||
return exclusive_lock.load(); |
||||
} |
||||
} // namespace onevpl
|
||||
} // namespace wip
|
||||
} // namespace gapi
|
||||
} // namespace cv
|
@ -0,0 +1,47 @@ |
||||
// This file is part of OpenCV project.
|
||||
// It is subject to the license terms in the LICENSE file found in the top-level directory
|
||||
// of this distribution and at http://opencv.org/license.html.
|
||||
//
|
||||
// Copyright (C) 2021 Intel Corporation
|
||||
|
||||
#ifndef GAPI_STREAMING_ONEVPL_ACCELERATORS_SURFACE_SHARED_LOCK_HPP |
||||
#define GAPI_STREAMING_ONEVPL_ACCELERATORS_SURFACE_SHARED_LOCK_HPP |
||||
|
||||
#include <atomic> |
||||
#include <memory> |
||||
|
||||
#include "opencv2/gapi/own/exports.hpp" // GAPI_EXPORTS |
||||
|
||||
namespace cv { |
||||
namespace gapi { |
||||
namespace wip { |
||||
namespace onevpl { |
||||
|
||||
class GAPI_EXPORTS SharedLock { |
||||
public: |
||||
SharedLock(); |
||||
~SharedLock() = default; |
||||
|
||||
size_t shared_lock(); |
||||
size_t unlock_shared(); |
||||
|
||||
void lock(); |
||||
bool try_lock(); |
||||
void unlock(); |
||||
|
||||
bool owns() const; |
||||
private: |
||||
SharedLock(const SharedLock&) = delete; |
||||
SharedLock& operator= (const SharedLock&) = delete; |
||||
SharedLock(SharedLock&&) = delete; |
||||
SharedLock& operator== (SharedLock&&) = delete; |
||||
|
||||
std::atomic<bool> exclusive_lock; |
||||
std::atomic<size_t> shared_counter; |
||||
}; |
||||
} // namespace onevpl
|
||||
} // namespace wip
|
||||
} // namespace gapi
|
||||
} // namespace cv
|
||||
|
||||
#endif // GAPI_STREAMING_ONEVPL_ACCELERATORS_SURFACE_SHARED_LOCK_HPP
|
@ -0,0 +1,25 @@ |
||||
#ifndef GAPI_STREAMING_ONEVPL_EXPORT_HPP |
||||
#define GAPI_STREAMING_ONEVPL_EXPORT_HPP |
||||
|
||||
#if defined(_MSC_VER) |
||||
#pragma warning(push) |
||||
#pragma warning(disable : 4201) |
||||
#pragma warning(disable : 4302) |
||||
#pragma warning(disable : 4311) |
||||
#pragma warning(disable : 4312) |
||||
#endif // defined(_MSC_VER)
|
||||
|
||||
#ifdef HAVE_ONEVPL |
||||
#if (MFX_VERSION >= 2000) |
||||
#include <vpl/mfxdispatcher.h> |
||||
#endif // MFX_VERSION
|
||||
|
||||
#include <vpl/mfx.h> |
||||
#include <vpl/mfxvideo.h> |
||||
#endif // HAVE_ONEVPL
|
||||
|
||||
#if defined(_MSC_VER) |
||||
#pragma warning(pop) |
||||
#endif // defined(_MSC_VER)
|
||||
|
||||
#endif // GAPI_STREAMING_ONEVPL_EXPORT_HPP
|
@ -0,0 +1,86 @@ |
||||
// This file is part of OpenCV project.
|
||||
// It is subject to the license terms in the LICENSE file found in the top-level directory
|
||||
// of this distribution and at http://opencv.org/license.html.
|
||||
//
|
||||
// Copyright (C) 2021 Intel Corporation
|
||||
|
||||
#ifndef OPENCV_GAPI_STREAMING_TESTS_COMMON_HPP |
||||
#define OPENCV_GAPI_STREAMING_TESTS_COMMON_HPP |
||||
|
||||
#include "gapi_tests_common.hpp" |
||||
#include <opencv2/gapi/streaming/onevpl/source.hpp> |
||||
#include <opencv2/gapi/streaming/onevpl/data_provider_interface.hpp> |
||||
#include "streaming/onevpl/data_provider_defines.hpp" |
||||
|
||||
#ifdef HAVE_ONEVPL |
||||
#include "streaming/onevpl/onevpl_export.hpp" |
||||
|
||||
namespace opencv_test { |
||||
namespace streaming { |
||||
namespace onevpl { |
||||
|
||||
struct StreamDataProvider : public cv::gapi::wip::onevpl::IDataProvider { |
||||
|
||||
StreamDataProvider(std::istream& in) : data_stream (in) { |
||||
EXPECT_TRUE(in); |
||||
} |
||||
|
||||
mfx_codec_id_type get_mfx_codec_id() const override { |
||||
return MFX_CODEC_HEVC; |
||||
} |
||||
|
||||
bool fetch_bitstream_data(std::shared_ptr<mfx_bitstream> &out_bitstream) override { |
||||
if (empty()) { |
||||
return false; |
||||
} |
||||
|
||||
if (!out_bitstream) { |
||||
out_bitstream = std::make_shared<mfx_bitstream>(); |
||||
out_bitstream->MaxLength = 2000000; |
||||
out_bitstream->Data = (mfxU8 *)calloc(out_bitstream->MaxLength, sizeof(mfxU8)); |
||||
if(!out_bitstream->Data) { |
||||
throw std::runtime_error("Cannot allocate bitstream.Data bytes: " + |
||||
std::to_string(out_bitstream->MaxLength * sizeof(mfxU8))); |
||||
} |
||||
out_bitstream->CodecId = get_mfx_codec_id(); |
||||
} |
||||
|
||||
mfxU8 *p0 = out_bitstream->Data; |
||||
mfxU8 *p1 = out_bitstream->Data + out_bitstream->DataOffset; |
||||
EXPECT_FALSE(out_bitstream->DataOffset > out_bitstream->MaxLength - 1); |
||||
EXPECT_FALSE(out_bitstream->DataLength + out_bitstream->DataOffset > out_bitstream->MaxLength); |
||||
|
||||
std::copy_n(p1, out_bitstream->DataLength, p0); |
||||
|
||||
out_bitstream->DataOffset = 0; |
||||
out_bitstream->DataLength += static_cast<mfxU32>(fetch_data(out_bitstream->MaxLength - out_bitstream->DataLength, |
||||
out_bitstream->Data + out_bitstream->DataLength)); |
||||
return out_bitstream->DataLength != 0; |
||||
} |
||||
|
||||
size_t fetch_data(size_t out_data_size, void* out_data_buf) { |
||||
data_stream.read(reinterpret_cast<char*>(out_data_buf), out_data_size); |
||||
return data_stream.gcount(); |
||||
} |
||||
bool empty() const override { |
||||
return data_stream.eof() || data_stream.bad(); |
||||
} |
||||
private: |
||||
std::istream& data_stream; |
||||
}; |
||||
|
||||
static const unsigned char hevc_header[] = { |
||||
0x00, 0x00, 0x00, 0x01, 0x40, 0x01, 0x0C, 0x06, 0xFF, 0xFF, 0x01, 0x40, 0x00, |
||||
0x00, 0x03, 0x00, 0x80, 0x00, 0x00, 0x03, 0x00, 0x00, 0x03, 0x00, 0x78, 0x00, |
||||
0x00, 0x04, 0x02, 0x10, 0x30, 0x00, 0x00, 0x03, 0x00, 0x10, 0x00, 0x00, 0x03, |
||||
0x01, 0xE5, 0x00, 0x00, 0x00, 0x01, 0x42, 0x01, 0x06, 0x01, 0x40, 0x00, 0x00, |
||||
0x03, 0x00, 0x80, 0x00, 0x00, 0x03, 0x00, 0x00, 0x03, 0x00, 0x78, 0x00, 0x00, |
||||
0xA0, 0x10, 0x20, 0x61, 0x63, 0x41, 0x00, 0x86, 0x49, 0x1B, 0x2B, 0x20, 0x00, |
||||
0x00, 0x00, 0x01, 0x44, 0x01, 0xC0, 0x71, 0xC0, 0xD9, 0x20, 0x00, 0x00, 0x00, |
||||
0x01, 0x26, 0x01, 0xAF, 0x0C |
||||
}; |
||||
} // namespace onevpl
|
||||
} // namespace streaming
|
||||
} // namespace opencv_test
|
||||
#endif // HAVE_ONEVPL
|
||||
#endif // OPENCV_GAPI_STREAMING_TESTS_HPP
|
@ -0,0 +1,348 @@ |
||||
// This file is part of OpenCV project.
|
||||
// It is subject to the license terms in the LICENSE file found in the top-level directory
|
||||
// of this distribution and at http://opencv.org/license.html.
|
||||
//
|
||||
// Copyright (C) 2021 Intel Corporation
|
||||
|
||||
|
||||
#include "../test_precomp.hpp" |
||||
|
||||
#include "../common/gapi_streaming_tests_common.hpp" |
||||
|
||||
#include <chrono> |
||||
#include <future> |
||||
|
||||
#define private public |
||||
#include "streaming/onevpl/accelerators/utils/shared_lock.hpp" |
||||
#undef private |
||||
|
||||
#include "streaming/onevpl/accelerators/utils/elastic_barrier.hpp" |
||||
|
||||
namespace opencv_test |
||||
{ |
||||
namespace |
||||
{ |
||||
using cv::gapi::wip::onevpl::SharedLock; |
||||
|
||||
struct TestBarrier : public cv::gapi::wip::onevpl::elastic_barrier<TestBarrier> { |
||||
void on_first_in_impl(size_t visitor_id) { |
||||
|
||||
static std::atomic<int> thread_counter{}; |
||||
thread_counter++; |
||||
EXPECT_EQ(thread_counter.load(), 1); |
||||
|
||||
visitors_in.insert(visitor_id); |
||||
last_visitor_id = visitor_id; |
||||
|
||||
thread_counter--; |
||||
EXPECT_EQ(thread_counter.load(), 0); |
||||
} |
||||
|
||||
void on_last_out_impl(size_t visitor_id) { |
||||
|
||||
static std::atomic<int> thread_counter{}; |
||||
thread_counter++; |
||||
EXPECT_EQ(thread_counter.load(), 1); |
||||
|
||||
visitors_out.insert(visitor_id); |
||||
last_visitor_id = visitor_id; |
||||
|
||||
thread_counter--; |
||||
EXPECT_EQ(thread_counter.load(), 0); |
||||
} |
||||
|
||||
size_t last_visitor_id = 0; |
||||
std::set<size_t> visitors_in; |
||||
std::set<size_t> visitors_out; |
||||
}; |
||||
|
||||
TEST(OneVPL_SharedLock, Create) { |
||||
SharedLock lock; |
||||
EXPECT_EQ(lock.shared_counter.load(), size_t{0}); |
||||
} |
||||
|
||||
TEST(OneVPL_SharedLock, Read_SingleThread) |
||||
{ |
||||
SharedLock lock; |
||||
|
||||
const size_t single_thread_read_count = 100; |
||||
for(size_t i = 0; i < single_thread_read_count; i++) { |
||||
lock.shared_lock(); |
||||
EXPECT_FALSE(lock.owns()); |
||||
} |
||||
EXPECT_EQ(lock.shared_counter.load(), single_thread_read_count); |
||||
|
||||
for(size_t i = 0; i < single_thread_read_count; i++) { |
||||
lock.unlock_shared(); |
||||
EXPECT_FALSE(lock.owns()); |
||||
} |
||||
|
||||
EXPECT_EQ(lock.shared_counter.load(), size_t{0}); |
||||
} |
||||
|
||||
TEST(OneVPL_SharedLock, TryLock_SingleThread) |
||||
{ |
||||
SharedLock lock; |
||||
|
||||
EXPECT_TRUE(lock.try_lock()); |
||||
EXPECT_TRUE(lock.owns()); |
||||
|
||||
lock.unlock(); |
||||
EXPECT_FALSE(lock.owns()); |
||||
EXPECT_EQ(lock.shared_counter.load(), size_t{0}); |
||||
} |
||||
|
||||
TEST(OneVPL_SharedLock, Write_SingleThread) |
||||
{ |
||||
SharedLock lock; |
||||
|
||||
lock.lock(); |
||||
EXPECT_TRUE(lock.owns()); |
||||
|
||||
lock.unlock(); |
||||
EXPECT_FALSE(lock.owns()); |
||||
EXPECT_EQ(lock.shared_counter.load(), size_t{0}); |
||||
} |
||||
|
||||
TEST(OneVPL_SharedLock, TryLockTryLock_SingleThread) |
||||
{ |
||||
SharedLock lock; |
||||
|
||||
lock.try_lock(); |
||||
EXPECT_FALSE(lock.try_lock()); |
||||
lock.unlock(); |
||||
|
||||
EXPECT_FALSE(lock.owns()); |
||||
} |
||||
|
||||
TEST(OneVPL_SharedLock, ReadTryLock_SingleThread) |
||||
{ |
||||
SharedLock lock; |
||||
|
||||
lock.shared_lock(); |
||||
EXPECT_FALSE(lock.owns()); |
||||
EXPECT_FALSE(lock.try_lock()); |
||||
lock.unlock_shared(); |
||||
|
||||
EXPECT_TRUE(lock.try_lock()); |
||||
EXPECT_TRUE(lock.owns()); |
||||
lock.unlock(); |
||||
} |
||||
|
||||
TEST(OneVPL_SharedLock, WriteTryLock_SingleThread) |
||||
{ |
||||
SharedLock lock; |
||||
|
||||
lock.lock(); |
||||
EXPECT_TRUE(lock.owns()); |
||||
EXPECT_FALSE(lock.try_lock()); |
||||
lock.unlock(); |
||||
|
||||
EXPECT_TRUE(lock.try_lock()); |
||||
EXPECT_TRUE(lock.owns()); |
||||
lock.unlock(); |
||||
} |
||||
|
||||
|
||||
TEST(OneVPL_SharedLock, Write_MultiThread) |
||||
{ |
||||
SharedLock lock; |
||||
|
||||
std::promise<void> barrier; |
||||
std::shared_future<void> sync = barrier.get_future(); |
||||
|
||||
static const size_t inc_count = 10000000; |
||||
size_t shared_value = 0; |
||||
auto work = [&lock, &shared_value](size_t count) { |
||||
for (size_t i = 0; i < count; i ++) { |
||||
lock.lock(); |
||||
shared_value ++; |
||||
lock.unlock(); |
||||
} |
||||
}; |
||||
|
||||
std::thread worker_thread([&barrier, sync, work] () { |
||||
|
||||
std::thread sub_worker([&barrier, work] () { |
||||
barrier.set_value(); |
||||
work(inc_count); |
||||
}); |
||||
|
||||
sync.wait(); |
||||
work(inc_count); |
||||
sub_worker.join(); |
||||
}); |
||||
sync.wait(); |
||||
|
||||
work(inc_count); |
||||
worker_thread.join(); |
||||
|
||||
EXPECT_EQ(shared_value, inc_count * 3); |
||||
} |
||||
|
||||
TEST(OneVPL_SharedLock, ReadWrite_MultiThread) |
||||
{ |
||||
SharedLock lock; |
||||
|
||||
std::promise<void> barrier; |
||||
std::future<void> sync = barrier.get_future(); |
||||
|
||||
static const size_t inc_count = 10000000; |
||||
size_t shared_value = 0; |
||||
auto write_work = [&lock, &shared_value](size_t count) { |
||||
for (size_t i = 0; i < count; i ++) { |
||||
lock.lock(); |
||||
shared_value ++; |
||||
lock.unlock(); |
||||
} |
||||
}; |
||||
|
||||
auto read_work = [&lock, &shared_value](size_t count) { |
||||
|
||||
auto old_shared_value = shared_value; |
||||
for (size_t i = 0; i < count; i ++) { |
||||
lock.shared_lock(); |
||||
EXPECT_TRUE(shared_value >= old_shared_value); |
||||
old_shared_value = shared_value; |
||||
lock.unlock_shared(); |
||||
} |
||||
}; |
||||
|
||||
std::thread writer_thread([&barrier, write_work] () { |
||||
barrier.set_value(); |
||||
write_work(inc_count); |
||||
}); |
||||
sync.wait(); |
||||
|
||||
read_work(inc_count); |
||||
writer_thread.join(); |
||||
|
||||
EXPECT_EQ(shared_value, inc_count); |
||||
} |
||||
|
||||
|
||||
TEST(OneVPL_ElasticBarrier, single_thread_visit) |
||||
{ |
||||
TestBarrier barrier; |
||||
|
||||
const size_t max_visit_count = 10000; |
||||
size_t visit_id = 0; |
||||
for (visit_id = 0; visit_id < max_visit_count; visit_id++) { |
||||
barrier.visit_in(visit_id); |
||||
EXPECT_EQ(barrier.visitors_in.size(), size_t{1}); |
||||
} |
||||
EXPECT_EQ(barrier.last_visitor_id, size_t{0}); |
||||
EXPECT_EQ(barrier.visitors_out.size(), size_t{0}); |
||||
|
||||
for (visit_id = 0; visit_id < max_visit_count; visit_id++) { |
||||
barrier.visit_out(visit_id); |
||||
EXPECT_EQ(barrier.visitors_in.size(), size_t{1}); |
||||
} |
||||
EXPECT_EQ(barrier.last_visitor_id, visit_id - 1); |
||||
EXPECT_EQ(barrier.visitors_out.size(), size_t{1}); |
||||
} |
||||
|
||||
|
||||
TEST(OneVPL_ElasticBarrier, multi_thread_visit) |
||||
{ |
||||
TestBarrier tested_barrier; |
||||
|
||||
static const size_t max_visit_count = 10000000; |
||||
std::atomic<size_t> visit_in_wait_counter{}; |
||||
std::promise<void> start_sync_barrier; |
||||
std::shared_future<void> start_sync = start_sync_barrier.get_future(); |
||||
std::promise<void> phase_sync_barrier; |
||||
std::shared_future<void> phase_sync = phase_sync_barrier.get_future(); |
||||
|
||||
auto visit_worker_job = [&tested_barrier, |
||||
&visit_in_wait_counter, |
||||
start_sync, |
||||
phase_sync] (size_t worker_id) { |
||||
|
||||
start_sync.wait(); |
||||
|
||||
// first phase
|
||||
const size_t begin_range = worker_id * max_visit_count; |
||||
const size_t end_range = (worker_id + 1) * max_visit_count; |
||||
for (size_t visit_id = begin_range; visit_id < end_range; visit_id++) { |
||||
tested_barrier.visit_in(visit_id); |
||||
} |
||||
|
||||
// notify all worker first phase ready
|
||||
visit_in_wait_counter.fetch_add(1); |
||||
|
||||
// wait main second phase
|
||||
phase_sync.wait(); |
||||
|
||||
// second phase
|
||||
for (size_t visit_id = begin_range; visit_id < end_range; visit_id++) { |
||||
tested_barrier.visit_out(visit_id); |
||||
} |
||||
}; |
||||
|
||||
auto visit_main_job = [&tested_barrier, |
||||
&visit_in_wait_counter, |
||||
&phase_sync_barrier] (size_t total_workers_count, |
||||
size_t worker_id) { |
||||
|
||||
const size_t begin_range = worker_id * max_visit_count; |
||||
const size_t end_range = (worker_id + 1) * max_visit_count; |
||||
for (size_t visit_id = begin_range; visit_id < end_range; visit_id++) { |
||||
tested_barrier.visit_in(visit_id); |
||||
} |
||||
|
||||
// wait all workers first phase done
|
||||
visit_in_wait_counter.fetch_add(1); |
||||
while (visit_in_wait_counter.load() != total_workers_count) { |
||||
std::this_thread::yield(); |
||||
}; |
||||
|
||||
// TEST invariant: last_visitor_id MUST be one from any FIRST worker visitor_id
|
||||
bool one_of_available_ids_matched = false; |
||||
for (size_t id = 0; id < total_workers_count; id ++) { |
||||
size_t expected_last_visitor_for_id = id * max_visit_count; |
||||
one_of_available_ids_matched |= |
||||
(tested_barrier.last_visitor_id == expected_last_visitor_for_id) ; |
||||
} |
||||
EXPECT_TRUE(one_of_available_ids_matched); |
||||
|
||||
// unblock all workers to work out second phase
|
||||
phase_sync_barrier.set_value(); |
||||
|
||||
// continue second phase
|
||||
for (size_t visit_id = begin_range; visit_id < end_range; visit_id++) { |
||||
tested_barrier.visit_out(visit_id); |
||||
} |
||||
}; |
||||
|
||||
size_t max_worker_count = std::thread::hardware_concurrency(); |
||||
if (max_worker_count < 2) { |
||||
max_worker_count = 2; // logical 2 threads required at least
|
||||
} |
||||
std::vector<std::thread> workers; |
||||
workers.reserve(max_worker_count); |
||||
for (size_t worker_id = 1; worker_id < max_worker_count; worker_id++) { |
||||
workers.emplace_back(visit_worker_job, worker_id); |
||||
} |
||||
|
||||
// let's go for first phase
|
||||
start_sync_barrier.set_value(); |
||||
|
||||
// utilize main thread as well
|
||||
visit_main_job(max_worker_count, 0); |
||||
|
||||
// join all threads second phase
|
||||
for (auto& w : workers) { |
||||
w.join(); |
||||
} |
||||
|
||||
// TEST invariant: last_visitor_id MUST be one from any LATTER worker visitor_id
|
||||
bool one_of_available_ids_matched = false; |
||||
for (size_t id = 0; id < max_worker_count; id ++) { |
||||
one_of_available_ids_matched |= |
||||
(tested_barrier.last_visitor_id == ((id + 1) * max_visit_count - 1)) ; |
||||
} |
||||
EXPECT_TRUE(one_of_available_ids_matched); |
||||
} |
||||
} |
||||
} // opencv_test
|
Loading…
Reference in new issue