Merge pull request #24845 from TolyaTalamanov:at/concurrent-executor

G-API: Implement concurrent executor #24845

## Overview
This PR introduces the new G-API executor called `GThreadedExecutor` which can be selected when the `GComputation` is compiled in `serial` mode (a.k.a `GComputation::compile(...)`)

### ThreadPool
`cv::gapi::own::ThreadPool` has been introduced in order to abstract usage of threads in `GThreadedExecutor`.
`ThreadPool` is implemented by using  `own::concurrent_bounded_queue`

`ThreadPool` has only as single method `schedule` that will push task into the queue for the further execution.
The **important** notice is that if `Task` executed in `ThreadPool` throws exception - this is `UB`. 

### GThreadedExecutor
The `GThreadedExecutor` is mostly copy-paste of `GExecutor`, should we extend `GExecutor` instead? 

#### Implementation details
1. Build the dependency graph for `Island` nodes.
2. Store the tasks that don't have dependencies into separate `vector` in order to run them first.
3. at the `GThreadedExecutor::run()` schedule the tasks that don't have dependencies that will schedule their dependents and wait for the completion.


### Pull Request Readiness Checklist

See details at https://github.com/opencv/opencv/wiki/How_to_contribute#making-a-good-pull-request

- [ ] I agree to contribute to the project under Apache 2 License.
- [ ] To the best of my knowledge, the proposed patch is not based on a code under GPL or another license that is incompatible with OpenCV
- [ ] The PR is proposed to the proper branch
- [ ] There is a reference to the original bug report and related work
- [ ] There is accuracy test, performance test and test data in opencv_extra repository, if applicable
      Patch to opencv_extra has the same branch name.
- [ ] The feature is well documented and sample code can be built with the project CMake
pull/24942/head
Anatoliy Talamanov 12 months ago committed by GitHub
parent 87f749277d
commit 8e43c8f200
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 3
      modules/gapi/CMakeLists.txt
  2. 20
      modules/gapi/include/opencv2/gapi/gcommon.hpp
  3. 18
      modules/gapi/src/api/gcommon.cpp
  4. 11
      modules/gapi/src/compiler/gcompiler.cpp
  5. 511
      modules/gapi/src/executor/gthreadedexecutor.cpp
  6. 123
      modules/gapi/src/executor/gthreadedexecutor.hpp
  7. 67
      modules/gapi/src/executor/thread_pool.cpp
  8. 71
      modules/gapi/src/executor/thread_pool.hpp
  9. 59
      modules/gapi/test/gapi_sample_pipelines.cpp
  10. 124
      modules/gapi/test/own/thread_pool_tests.cpp

@ -79,6 +79,7 @@ set(gapi_srcs
src/api/gframe.cpp
src/api/gkernel.cpp
src/api/gbackend.cpp
src/api/gcommon.cpp
src/api/gproto.cpp
src/api/gnode.cpp
src/api/gcall.cpp
@ -121,8 +122,10 @@ set(gapi_srcs
src/executor/gabstractstreamingexecutor.cpp
src/executor/gexecutor.cpp
src/executor/gtbbexecutor.cpp
src/executor/gthreadedexecutor.cpp
src/executor/gstreamingexecutor.cpp
src/executor/gasync.cpp
src/executor/thread_pool.cpp
# CPU Backend (currently built-in)
src/backends/cpu/gcpubackend.cpp

@ -263,12 +263,32 @@ struct graph_dump_path
};
/** @} */
/**
* @brief Ask G-API to use threaded executor when cv::GComputation
* is compiled via cv::GComputation::compile method.
*
* Specifies a number of threads that should be used by executor.
*/
struct GAPI_EXPORTS use_threaded_executor
{
use_threaded_executor();
explicit use_threaded_executor(const uint32_t nthreads);
uint32_t num_threads;
};
/** @} */
namespace detail
{
template<> struct CompileArgTag<cv::graph_dump_path>
{
static const char* tag() { return "gapi.graph_dump_path"; }
};
template<> struct CompileArgTag<cv::use_threaded_executor>
{
static const char* tag() { return "gapi.threaded_executor"; }
};
}
} // namespace cv

@ -0,0 +1,18 @@
// This file is part of OpenCV project.
// It is subject to the license terms in the LICENSE file found in the top-level directory
// of this distribution and at http://opencv.org/license.html.
//
// Copyright (C) 2024 Intel Corporation
#include "precomp.hpp"
#include <opencv2/gapi/gcommon.hpp>
#include <opencv2/core/utility.hpp>
cv::use_threaded_executor::use_threaded_executor()
: num_threads(cv::getNumThreads()) {
}
cv::use_threaded_executor::use_threaded_executor(const uint32_t nthreads)
: num_threads(nthreads) {
}

@ -33,6 +33,7 @@
#include "compiler/passes/pattern_matching.hpp"
#include "executor/gexecutor.hpp"
#include "executor/gthreadedexecutor.hpp"
#include "executor/gstreamingexecutor.hpp"
#include "backends/common/gbackend.hpp"
#include "backends/common/gmetabackend.hpp"
@ -452,8 +453,16 @@ cv::GCompiled cv::gimpl::GCompiler::produceCompiled(GPtr &&pg)
.get<OutputMeta>().outMeta;
// FIXME: select which executor will be actually used,
// make GExecutor abstract.
std::unique_ptr<GExecutor> pE(new GExecutor(std::move(pg)));
auto use_threaded_exec = cv::gapi::getCompileArg<cv::use_threaded_executor>(m_args);
std::unique_ptr<GAbstractExecutor> pE;
if (use_threaded_exec) {
const auto num_threads = use_threaded_exec.value().num_threads;
GAPI_LOG_INFO(NULL, "Threaded executor with " << num_threads << " thread(s) will be used");
pE.reset(new GThreadedExecutor(num_threads, std::move(pg)));
} else {
pE.reset(new GExecutor(std::move(pg)));
}
GCompiled compiled;
compiled.priv().setup(m_metas, outMetas, std::move(pE));

@ -0,0 +1,511 @@
// This file is part of OpenCV project.
// It is subject to the license terms in the LICENSE file found in the top-level directory
// of this distribution and at http://opencv.org/license.html.
//
// Copyright (C) 2024 Intel Corporation
#include "precomp.hpp"
#include <ade/util/zip_range.hpp>
#include <opencv2/gapi/opencv_includes.hpp>
#include "api/gproto_priv.hpp" // ptr(GRunArgP)
#include "executor/gthreadedexecutor.hpp"
#include "compiler/passes/passes.hpp"
namespace cv {
namespace gimpl {
namespace magazine {
namespace {
void bindInArgExec(Mag& mag, const RcDesc &rc, const GRunArg &arg) {
if (rc.shape != GShape::GMAT) {
bindInArg(mag, rc, arg);
return;
}
auto& mag_rmat = mag.template slot<cv::RMat>()[rc.id];
switch (arg.index()) {
case GRunArg::index_of<Mat>() :
mag_rmat = make_rmat<RMatOnMat>(util::get<Mat>(arg));
break;
case GRunArg::index_of<cv::RMat>() :
mag_rmat = util::get<cv::RMat>(arg);
break;
default: util::throw_error(std::logic_error("content type of the runtime argument does not match to resource description ?"));
}
// FIXME: has to take extra care about meta here for this particuluar
// case, just because this function exists at all
mag.meta<cv::RMat>()[rc.id] = arg.meta;
}
void bindOutArgExec(Mag& mag, const RcDesc &rc, const GRunArgP &arg) {
if (rc.shape != GShape::GMAT) {
bindOutArg(mag, rc, arg);
return;
}
auto& mag_rmat = mag.template slot<cv::RMat>()[rc.id];
switch (arg.index()) {
case GRunArgP::index_of<Mat*>() :
mag_rmat = make_rmat<RMatOnMat>(*util::get<Mat*>(arg)); break;
case GRunArgP::index_of<cv::RMat*>() :
mag_rmat = *util::get<cv::RMat*>(arg); break;
default: util::throw_error(std::logic_error("content type of the runtime argument does not match to resource description ?"));
}
}
cv::GRunArgP getObjPtrExec(Mag& mag, const RcDesc &rc) {
if (rc.shape != GShape::GMAT) {
return getObjPtr(mag, rc);
}
return GRunArgP(&mag.slot<cv::RMat>()[rc.id]);
}
void writeBackExec(const Mag& mag, const RcDesc &rc, GRunArgP &g_arg) {
if (rc.shape != GShape::GMAT) {
writeBack(mag, rc, g_arg);
return;
}
switch (g_arg.index()) {
case GRunArgP::index_of<cv::Mat*>() : {
// If there is a copy intrinsic at the end of the graph
// we need to actually copy the data to the user buffer
// since output runarg was optimized to simply point
// to the input of the copy kernel
// FIXME:
// Rework, find a better way to check if there should be
// a real copy (add a pass to StreamingBackend?)
// NB: In case RMat adapter not equal to "RMatOnMat" need to
// copy data back to the host as well.
auto& out_mat = *util::get<cv::Mat*>(g_arg);
const auto& rmat = mag.template slot<cv::RMat>().at(rc.id);
auto* adapter = rmat.get<RMatOnMat>();
if ((adapter != nullptr && out_mat.data != adapter->data()) ||
(adapter == nullptr)) {
auto view = rmat.access(RMat::Access::R);
asMat(view).copyTo(out_mat);
}
break;
}
case GRunArgP::index_of<cv::RMat*>() : /* do nothing */ break;
default: util::throw_error(std::logic_error("content type of the runtime argument does not match to resource description ?"));
}
}
void assignMetaStubExec(Mag& mag, const RcDesc &rc, const cv::GRunArg::Meta &meta) {
switch (rc.shape) {
case GShape::GARRAY: mag.meta<cv::detail::VectorRef>()[rc.id] = meta; break;
case GShape::GOPAQUE: mag.meta<cv::detail::OpaqueRef>()[rc.id] = meta; break;
case GShape::GSCALAR: mag.meta<cv::Scalar>()[rc.id] = meta; break;
case GShape::GFRAME: mag.meta<cv::MediaFrame>()[rc.id] = meta; break;
case GShape::GMAT:
mag.meta<cv::Mat>() [rc.id] = meta;
mag.meta<cv::RMat>()[rc.id] = meta;
#if !defined(GAPI_STANDALONE)
mag.meta<cv::UMat>()[rc.id] = meta;
#endif
break;
default: util::throw_error(std::logic_error("Unsupported GShape type")); break;
}
}
} // anonymous namespace
}}} // namespace cv::gimpl::magazine
cv::gimpl::StreamMsg cv::gimpl::GThreadedExecutor::Input::get() {
std::lock_guard<std::mutex> lock{m_state.m};
cv::GRunArgs res;
for (const auto &rc : desc()) { res.emplace_back(magazine::getArg(m_state.mag, rc)); }
return cv::gimpl::StreamMsg{std::move(res)};
}
cv::gimpl::GThreadedExecutor::Input::Input(cv::gimpl::GraphState &state,
const std::vector<RcDesc> &rcs)
: m_state(state) {
set(rcs);
};
cv::GRunArgP cv::gimpl::GThreadedExecutor::Output::get(int idx) {
std::lock_guard<std::mutex> lock{m_state.m};
auto r = magazine::getObjPtrExec(m_state.mag, desc()[idx]);
// Remember the output port for this output object
m_out_idx[cv::gimpl::proto::ptr(r)] = idx;
return r;
}
void cv::gimpl::GThreadedExecutor::Output::post(cv::GRunArgP&&, const std::exception_ptr& e) {
if (e) {
m_eptr = e;
}
}
void cv::gimpl::GThreadedExecutor::Output::post(Exception&& ex) {
m_eptr = std::move(ex.eptr);
}
void cv::gimpl::GThreadedExecutor::Output::meta(const GRunArgP &out, const GRunArg::Meta &m) {
const auto idx = m_out_idx.at(cv::gimpl::proto::ptr(out));
std::lock_guard<std::mutex> lock{m_state.m};
magazine::assignMetaStubExec(m_state.mag, desc()[idx], m);
}
cv::gimpl::GThreadedExecutor::Output::Output(cv::gimpl::GraphState &state,
const std::vector<RcDesc> &rcs)
: m_state(state) {
set(rcs);
}
void cv::gimpl::GThreadedExecutor::Output::verify() {
if (m_eptr) {
std::rethrow_exception(m_eptr);
}
}
void cv::gimpl::GThreadedExecutor::initResource(const ade::NodeHandle &nh, const ade::NodeHandle &orig_nh) {
const Data &d = m_gm.metadata(orig_nh).get<Data>();
if ( d.storage != Data::Storage::INTERNAL
&& d.storage != Data::Storage::CONST_VAL) {
return;
}
// INTERNALS+CONST only! no need to allocate/reset output objects
// to as it is bound externally (e.g. already in the m_state.mag)
switch (d.shape) {
case GShape::GMAT: {
// Let island allocate it's outputs if it can,
// allocate cv::Mat and wrap it with RMat otherwise
GAPI_Assert(!nh->inNodes().empty());
const auto desc = util::get<cv::GMatDesc>(d.meta);
auto& exec = m_gim.metadata(nh->inNodes().front()).get<IslandExec>().object;
auto& rmat = m_state.mag.slot<cv::RMat>()[d.rc];
if (exec->allocatesOutputs()) {
rmat = exec->allocate(desc);
} else {
Mat mat;
createMat(desc, mat);
rmat = make_rmat<RMatOnMat>(mat);
}
}
break;
case GShape::GSCALAR:
if (d.storage == Data::Storage::CONST_VAL) {
auto rc = RcDesc{d.rc, d.shape, d.ctor};
magazine::bindInArg(m_state.mag, rc, m_gm.metadata(orig_nh).get<ConstValue>().arg);
}
break;
case GShape::GARRAY:
if (d.storage == Data::Storage::CONST_VAL) {
auto rc = RcDesc{d.rc, d.shape, d.ctor};
magazine::bindInArg(m_state.mag, rc, m_gm.metadata(orig_nh).get<ConstValue>().arg);
}
break;
case GShape::GOPAQUE:
// Constructed on Reset, do nothing here
break;
case GShape::GFRAME: {
// Should be defined by backend, do nothing here
break;
}
default:
GAPI_Error("InternalError");
}
}
cv::gimpl::IslandActor::IslandActor(const std::vector<RcDesc> &in_objects,
const std::vector<RcDesc> &out_objects,
std::shared_ptr<GIslandExecutable> isl_exec,
cv::gimpl::GraphState &state)
: m_isl_exec(isl_exec),
m_inputs(state, in_objects),
m_outputs(state, out_objects) {
}
void cv::gimpl::IslandActor::run() {
m_isl_exec->run(m_inputs, m_outputs);
}
void cv::gimpl::IslandActor::verify() {
m_outputs.verify();
};
class cv::gimpl::Task {
friend class TaskManager;
public:
using Ptr = std::shared_ptr<Task>;
Task(TaskManager::F&& f, std::vector<Task::Ptr> &&producers);
struct ExecutionState {
cv::gapi::own::ThreadPool& tp;
cv::gapi::own::Latch& latch;
};
void run(ExecutionState& state);
bool isLast() const { return m_consumers.empty(); }
void reset() { m_ready_producers.store(0u); }
private:
TaskManager::F m_f;
const uint32_t m_num_producers;
std::atomic<uint32_t> m_ready_producers;
std::vector<Task*> m_consumers;
};
cv::gimpl::Task::Task(TaskManager::F &&f,
std::vector<Task::Ptr> &&producers)
: m_f(std::move(f)),
m_num_producers(static_cast<uint32_t>(producers.size())) {
for (auto producer : producers) {
producer->m_consumers.push_back(this);
}
}
void cv::gimpl::Task::run(ExecutionState& state) {
// Execute the task
m_f();
// Notify every consumer about completion one of its dependencies
for (auto* consumer : m_consumers) {
const auto num_ready =
consumer->m_ready_producers.fetch_add(1, std::memory_order_relaxed) + 1;
// The last completed producer schedule the consumer for execution
if (num_ready == consumer->m_num_producers) {
state.tp.schedule([&state, consumer](){
consumer->run(state);
});
}
}
// If tasks has no consumers this is the last task
// Execution lasts until all last tasks are completed
// Decrement the latch to notify about completion
if (isLast()) {
state.latch.count_down();
}
}
std::shared_ptr<cv::gimpl::Task>
cv::gimpl::TaskManager::createTask(cv::gimpl::TaskManager::F &&f,
std::vector<std::shared_ptr<cv::gimpl::Task>> &&producers) {
const bool is_initial = producers.empty();
auto task = std::make_shared<cv::gimpl::Task>(std::move(f),
std::move(producers));
m_all_tasks.emplace_back(task);
if (is_initial) {
m_initial_tasks.emplace_back(task);
}
return task;
}
void cv::gimpl::TaskManager::scheduleAndWait(cv::gapi::own::ThreadPool& tp) {
// Reset the number of ready dependencies for all tasks
for (auto& task : m_all_tasks) { task->reset(); }
// Count the number of last tasks
auto isLast = [](const std::shared_ptr<Task>& task) { return task->isLast(); };
const auto kNumLastsTasks =
std::count_if(m_all_tasks.begin(), m_all_tasks.end(), isLast);
// Initialize the latch, schedule initial tasks
// and wait until all lasts tasks are done
cv::gapi::own::Latch latch(kNumLastsTasks);
Task::ExecutionState state{tp, latch};
for (auto task : m_initial_tasks) {
state.tp.schedule([&state, task](){ task->run(state); });
}
latch.wait();
}
cv::gimpl::GThreadedExecutor::GThreadedExecutor(const uint32_t num_threads,
std::unique_ptr<ade::Graph> &&g_model)
: GAbstractExecutor(std::move(g_model)),
m_thread_pool(num_threads) {
auto sorted = m_gim.metadata().get<ade::passes::TopologicalSortData>();
std::unordered_map< ade::NodeHandle
, std::shared_ptr<Task>
, ade::HandleHasher<ade::Node>> m_tasks_map;
for (auto nh : sorted.nodes())
{
switch (m_gim.metadata(nh).get<NodeKind>().k)
{
case NodeKind::ISLAND:
{
std::vector<RcDesc> input_rcs;
std::vector<RcDesc> output_rcs;
input_rcs.reserve(nh->inNodes().size());
output_rcs.reserve(nh->outNodes().size());
auto xtract = [&](ade::NodeHandle slot_nh, std::vector<RcDesc> &vec) {
const auto orig_data_nh
= m_gim.metadata(slot_nh).get<DataSlot>().original_data_node;
const auto &orig_data_info
= m_gm.metadata(orig_data_nh).get<Data>();
vec.emplace_back(RcDesc{ orig_data_info.rc
, orig_data_info.shape
, orig_data_info.ctor});
};
for (auto in_slot_nh : nh->inNodes()) xtract(in_slot_nh, input_rcs);
for (auto out_slot_nh : nh->outNodes()) xtract(out_slot_nh, output_rcs);
auto actor = std::make_shared<IslandActor>(std::move(input_rcs),
std::move(output_rcs),
m_gim.metadata(nh).get<IslandExec>().object,
m_state);
m_actors.push_back(actor);
std::unordered_set<ade::NodeHandle, ade::HandleHasher<ade::Node>> producer_nhs;
for (auto slot_nh : nh->inNodes()) {
for (auto island_nh : slot_nh->inNodes()) {
GAPI_Assert(m_gim.metadata(island_nh).get<NodeKind>().k == NodeKind::ISLAND);
producer_nhs.emplace(island_nh);
}
}
std::vector<std::shared_ptr<Task>> producers;
producers.reserve(producer_nhs.size());
for (auto producer_nh : producer_nhs) {
producers.push_back(m_tasks_map.at(producer_nh));
}
auto task = m_task_manager.createTask(
[actor](){actor->run();}, std::move(producers));
m_tasks_map.emplace(nh, task);
}
break;
case NodeKind::SLOT:
{
const auto orig_data_nh
= m_gim.metadata(nh).get<DataSlot>().original_data_node;
initResource(nh, orig_data_nh);
m_slots.emplace_back(DataDesc{nh, orig_data_nh});
}
break;
default:
GAPI_Error("InternalError");
break;
} // switch(kind)
} // for(gim nodes)
prepareForNewStream();
}
void cv::gimpl::GThreadedExecutor::run(cv::gimpl::GRuntimeArgs &&args) {
const auto proto = m_gm.metadata().get<Protocol>();
// Basic check if input/output arguments are correct
// FIXME: Move to GCompiled (do once for all GExecutors)
if (proto.inputs.size() != args.inObjs.size()) { // TODO: Also check types
util::throw_error(std::logic_error
("Computation's input protocol doesn\'t "
"match actual arguments!"));
}
if (proto.outputs.size() != args.outObjs.size()) { // TODO: Also check types
util::throw_error(std::logic_error
("Computation's output protocol doesn\'t "
"match actual arguments!"));
}
namespace util = ade::util;
// ensure that output Mat parameters are correctly allocated
// FIXME: avoid copy of NodeHandle and GRunRsltComp ?
for (auto index : util::iota(proto.out_nhs.size())) {
auto& nh = proto.out_nhs.at(index);
const Data &d = m_gm.metadata(nh).get<Data>();
if (d.shape == GShape::GMAT) {
using cv::util::get;
const auto desc = get<cv::GMatDesc>(d.meta);
auto check_rmat = [&desc, &args, &index]() {
auto& out_mat = *get<cv::RMat*>(args.outObjs.at(index));
GAPI_Assert(desc.canDescribe(out_mat));
};
#if !defined(GAPI_STANDALONE)
// Building as part of OpenCV - follow OpenCV behavior In
// the case of cv::Mat if output buffer is not enough to
// hold the result, reallocate it
if (cv::util::holds_alternative<cv::Mat*>(args.outObjs.at(index))) {
auto& out_mat = *get<cv::Mat*>(args.outObjs.at(index));
createMat(desc, out_mat);
}
// In the case of RMat check to fit required meta
else {
check_rmat();
}
#else
// Building standalone - output buffer should always exist,
// and _exact_ match our inferred metadata
if (cv::util::holds_alternative<cv::Mat*>(args.outObjs.at(index))) {
auto& out_mat = *get<cv::Mat*>(args.outObjs.at(index));
GAPI_Assert(out_mat.data != nullptr &&
desc.canDescribe(out_mat));
}
// In the case of RMat check to fit required meta
else {
check_rmat();
}
#endif // !defined(GAPI_STANDALONE)
}
}
// Update storage with user-passed objects
for (auto it : ade::util::zip(ade::util::toRange(proto.inputs),
ade::util::toRange(args.inObjs))) {
magazine::bindInArgExec(m_state.mag, std::get<0>(it), std::get<1>(it));
}
for (auto it : ade::util::zip(ade::util::toRange(proto.outputs),
ade::util::toRange(args.outObjs))) {
magazine::bindOutArgExec(m_state.mag, std::get<0>(it), std::get<1>(it));
}
// Reset internal data
for (auto &sd : m_slots) {
const auto& data = m_gm.metadata(sd.data_nh).get<Data>();
magazine::resetInternalData(m_state.mag, data);
}
m_task_manager.scheduleAndWait(m_thread_pool);
for (auto actor : m_actors) {
actor->verify();
}
for (auto it : ade::util::zip(ade::util::toRange(proto.outputs),
ade::util::toRange(args.outObjs))) {
magazine::writeBackExec(m_state.mag, std::get<0>(it), std::get<1>(it));
}
}
bool cv::gimpl::GThreadedExecutor::canReshape() const {
for (auto actor : m_actors) {
if (actor->exec()->canReshape()) {
return false;
}
}
return true;
}
void cv::gimpl::GThreadedExecutor::reshape(const GMetaArgs& inMetas, const GCompileArgs& args) {
GAPI_Assert(canReshape());
auto& g = *m_orig_graph.get();
ade::passes::PassContext ctx{g};
passes::initMeta(ctx, inMetas);
passes::inferMeta(ctx, true);
// NB: Before reshape islands need to re-init resources for every slot.
for (auto slot : m_slots) {
initResource(slot.slot_nh, slot.data_nh);
}
for (auto actor : m_actors) {
actor->exec()->reshape(g, args);
}
}
void cv::gimpl::GThreadedExecutor::prepareForNewStream() {
for (auto actor : m_actors) {
actor->exec()->handleNewStream();
}
}

@ -0,0 +1,123 @@
// This file is part of OpenCV project.
// It is subject to the license terms in the LICENSE file found in the top-level directory
// of this distribution and at http://opencv.org/license.html.
//
// Copyright (C) 2024 Intel Corporation
#ifndef OPENCV_GAPI_GTHREADEDEXECUTOR_HPP
#define OPENCV_GAPI_GTHREADEDEXECUTOR_HPP
#include <utility> // tuple, required by magazine
#include <unordered_map> // required by magazine
#include "executor/gabstractexecutor.hpp"
#include "executor/thread_pool.hpp"
namespace cv {
namespace gimpl {
class Task;
class TaskManager {
public:
using F = std::function<void()>;
std::shared_ptr<Task> createTask(F &&f, std::vector<std::shared_ptr<Task>> &&producers);
void scheduleAndWait(cv::gapi::own::ThreadPool& tp);
private:
std::vector<std::shared_ptr<Task>> m_all_tasks;
std::vector<std::shared_ptr<Task>> m_initial_tasks;
};
struct GraphState {
Mag mag;
std::mutex m;
};
class IslandActor;
class GThreadedExecutor final: public GAbstractExecutor {
public:
class Input;
class Output;
explicit GThreadedExecutor(const uint32_t num_threads,
std::unique_ptr<ade::Graph> &&g_model);
void run(cv::gimpl::GRuntimeArgs &&args) override;
bool canReshape() const override;
void reshape(const GMetaArgs& inMetas, const GCompileArgs& args) override;
void prepareForNewStream() override;
private:
struct DataDesc
{
ade::NodeHandle slot_nh;
ade::NodeHandle data_nh;
};
void initResource(const ade::NodeHandle &nh, const ade::NodeHandle &orig_nh);
GraphState m_state;
std::vector<DataDesc> m_slots;
cv::gapi::own::ThreadPool m_thread_pool;
TaskManager m_task_manager;
std::vector<std::shared_ptr<IslandActor>> m_actors;
};
class GThreadedExecutor::Input final: public GIslandExecutable::IInput
{
public:
Input(GraphState& state, const std::vector<RcDesc> &rcs);
private:
virtual StreamMsg get() override;
virtual StreamMsg try_get() override { return get(); }
private:
GraphState& m_state;
};
class GThreadedExecutor::Output final: public GIslandExecutable::IOutput
{
public:
Output(GraphState &state, const std::vector<RcDesc> &rcs);
void verify();
private:
GRunArgP get(int idx) override;
void post(cv::GRunArgP&&, const std::exception_ptr& e) override;
void post(Exception&& ex) override;
void post(EndOfStream&&) override {};
void meta(const GRunArgP &out, const GRunArg::Meta &m) override;
private:
GraphState& m_state;
std::unordered_map<const void*, int> m_out_idx;
std::exception_ptr m_eptr;
};
class IslandActor {
public:
using Ptr = std::shared_ptr<IslandActor>;
IslandActor(const std::vector<RcDesc> &in_objects,
const std::vector<RcDesc> &out_objects,
std::shared_ptr<GIslandExecutable> isl_exec,
GraphState &state);
void run();
void verify();
std::shared_ptr<GIslandExecutable> exec() { return m_isl_exec; }
private:
std::shared_ptr<GIslandExecutable> m_isl_exec;
GThreadedExecutor::Input m_inputs;
GThreadedExecutor::Output m_outputs;
};
} // namespace gimpl
} // namespace cv
#endif // OPENCV_GAPI_GTHREADEDEXECUTOR_HPP

@ -0,0 +1,67 @@
// This file is part of OpenCV project.
// It is subject to the license terms in the LICENSE file found in the top-level directory
// of this distribution and at http://opencv.org/license.html.
//
// Copyright (C) 2024 Intel Corporation
#include "thread_pool.hpp"
#include <opencv2/gapi/util/throw.hpp>
cv::gapi::own::Latch::Latch(const uint64_t expected)
: m_expected(expected) {
}
void cv::gapi::own::Latch::count_down() {
std::lock_guard<std::mutex> lk{m_mutex};
--m_expected;
if (m_expected == 0) {
m_all_done.notify_all();
}
}
void cv::gapi::own::Latch::wait() {
std::unique_lock<std::mutex> lk{m_mutex};
while (m_expected != 0u) {
m_all_done.wait(lk);
}
}
cv::gapi::own::ThreadPool::ThreadPool(const uint32_t num_workers) {
m_workers.reserve(num_workers);
for (uint32_t i = 0; i < num_workers; ++i) {
m_workers.emplace_back(
cv::gapi::own::ThreadPool::worker, std::ref(m_queue));
}
}
void cv::gapi::own::ThreadPool::worker(QueueClass<Task>& queue) {
while (true) {
cv::gapi::own::ThreadPool::Task task;
queue.pop(task);
if (!task) {
break;
}
task();
}
}
void cv::gapi::own::ThreadPool::schedule(cv::gapi::own::ThreadPool::Task&& task) {
m_queue.push(std::move(task));
};
void cv::gapi::own::ThreadPool::shutdown() {
for (size_t i = 0; i < m_workers.size(); ++i) {
// NB: Empty task - is an indicator for workers to stop their loops
m_queue.push({});
}
for (auto& worker : m_workers) {
worker.join();
}
m_workers.clear();
}
cv::gapi::own::ThreadPool::~ThreadPool() {
shutdown();
}

@ -0,0 +1,71 @@
// This file is part of OpenCV project.
// It is subject to the license terms in the LICENSE file found in the top-level directory
// of this distribution and at http://opencv.org/license.html.
//
// Copyright (C) 2024 Intel Corporation
#ifndef OPENCV_GAPI_THREAD_POOL_HPP
#define OPENCV_GAPI_THREAD_POOL_HPP
#include <functional>
#include <vector>
#include <thread>
#include <mutex>
#include <atomic>
#include <condition_variable>
#include <opencv2/gapi/own/exports.hpp> // GAPI_EXPORTS
#if defined(HAVE_TBB)
# include <tbb/concurrent_queue.h> // FIXME: drop it from here!
template<typename T> using QueueClass = tbb::concurrent_bounded_queue<T>;
#else
# include "executor/conc_queue.hpp"
template<typename T> using QueueClass = cv::gapi::own::concurrent_bounded_queue<T>;
#endif // TBB
namespace cv {
namespace gapi {
namespace own {
// NB: Only for tests
class GAPI_EXPORTS Latch {
public:
explicit Latch(const uint64_t expected);
Latch(const Latch&) = delete;
Latch& operator=(const Latch&) = delete;
void count_down();
void wait();
private:
uint64_t m_expected;
std::mutex m_mutex;
std::condition_variable m_all_done;
};
// NB: Only for tests
class GAPI_EXPORTS ThreadPool {
public:
using Task = std::function<void()>;
explicit ThreadPool(const uint32_t num_workers);
ThreadPool(const ThreadPool&) = delete;
ThreadPool& operator=(const ThreadPool&) = delete;
void schedule(Task&& task);
~ThreadPool();
private:
static void worker(QueueClass<Task>& queue);
void shutdown();
private:
std::vector<std::thread> m_workers;
QueueClass<Task> m_queue;
};
}}} // namespace cv::gapi::own
#endif // OPENCV_GAPI_THREAD_POOL_HPP

@ -13,6 +13,8 @@
#include <opencv2/gapi/core.hpp>
#include "executor/thread_pool.hpp"
namespace opencv_test
{
@ -67,6 +69,38 @@ namespace
}
};
G_TYPED_KERNEL(GBusyWait, <GMat(GMat, uint32_t)>, "org.busy_wait") {
static GMatDesc outMeta(GMatDesc in, uint32_t)
{
return in;
}
};
GAPI_OCV_KERNEL(GOCVBusyWait, GBusyWait)
{
static void run(const cv::Mat& in,
const uint32_t time_in_ms,
cv::Mat& out)
{
using namespace std::chrono;
auto s = high_resolution_clock::now();
in.copyTo(out);
auto e = high_resolution_clock::now();
const auto elapsed_in_ms =
static_cast<int32_t>(duration_cast<milliseconds>(e-s).count());
int32_t diff = time_in_ms - elapsed_in_ms;
const auto need_to_wait_in_ms = static_cast<uint32_t>(std::max(0, diff));
s = high_resolution_clock::now();
e = s;
while (duration_cast<milliseconds>(e-s).count() < need_to_wait_in_ms) {
e = high_resolution_clock::now();
}
}
};
// These definitions test the correct macro work if the kernel has multiple output values
G_TYPED_KERNEL(GRetGArrayTupleOfGMat2Kernel, <GArray<std::tuple<GMat, GMat>>(GMat, Scalar)>, "org.opencv.test.retarrayoftupleofgmat2kernel") {};
G_TYPED_KERNEL(GRetGArraTupleyOfGMat3Kernel, <GArray<std::tuple<GMat, GMat, GMat>>(GMat)>, "org.opencv.test.retarrayoftupleofgmat3kernel") {};
@ -513,4 +547,29 @@ TEST(GAPI_Pipeline, 1DMatWithinSingleIsland)
EXPECT_EQ(0, cv::norm(out_mat, ref_mat));
}
TEST(GAPI_Pipeline, BranchesExecutedInParallel)
{
cv::GMat in;
// NB: cv::gapi::copy used to prevent fusing OCV backend operations
// into the single island where they will be executed in turn
auto out0 = GBusyWait::on(cv::gapi::copy(in), 1000u /*1sec*/);
auto out1 = GBusyWait::on(cv::gapi::copy(in), 1000u /*1sec*/);
auto out2 = GBusyWait::on(cv::gapi::copy(in), 1000u /*1sec*/);
auto out3 = GBusyWait::on(cv::gapi::copy(in), 1000u /*1sec*/);
cv::GComputation comp(cv::GIn(in), cv::GOut(out0,out1,out2,out3));
cv::Mat in_mat = cv::Mat::eye(32, 32, CV_8UC1);
cv::Mat out_mat0, out_mat1, out_mat2, out_mat3;
using namespace std::chrono;
auto s = high_resolution_clock::now();
comp.apply(cv::gin(in_mat), cv::gout(out_mat0, out_mat1, out_mat2, out_mat3),
cv::compile_args(cv::use_threaded_executor(4u),
cv::gapi::kernels<GOCVBusyWait>()));
auto e = high_resolution_clock::now();
const auto elapsed_in_ms = duration_cast<milliseconds>(e-s).count();;
EXPECT_GE(1200u, elapsed_in_ms);
}
} // namespace opencv_test

@ -0,0 +1,124 @@
// This file is part of OpenCV project.
// It is subject to the license terms in the LICENSE file found in the top-level directory
// of this distribution and at http://opencv.org/license.html.
//
// Copyright (C) 2024 Intel Corporation
#include "../test_precomp.hpp"
#include <chrono>
#include <thread>
#include "executor/thread_pool.hpp"
namespace opencv_test
{
using namespace cv::gapi;
TEST(ThreadPool, ScheduleNotBlock)
{
own::Latch latch(1u);
std::atomic<uint32_t> counter{0u};
own::ThreadPool tp(4u);
tp.schedule([&](){
std::this_thread::sleep_for(std::chrono::milliseconds{500u});
counter++;
latch.count_down();
});
EXPECT_EQ(0u, counter);
latch.wait();
EXPECT_EQ(1u, counter);
}
TEST(ThreadPool, MultipleTasks)
{
const uint32_t kNumTasks = 100u;
own::Latch latch(kNumTasks);
std::atomic<uint32_t> completed{0u};
own::ThreadPool tp(4u);
for (uint32_t i = 0; i < kNumTasks; ++i) {
tp.schedule([&]() {
++completed;
latch.count_down();
});
}
latch.wait();
EXPECT_EQ(kNumTasks, completed.load());
}
struct ExecutionState {
ExecutionState(const uint32_t num_threads,
const uint32_t num_tasks)
: guard(0u),
critical(0u),
limit(num_tasks),
latch(num_threads),
tp(num_threads) {
}
std::atomic<uint32_t> guard;
std::atomic<uint32_t> critical;
const uint32_t limit;
own::Latch latch;
own::ThreadPool tp;
};
static void doRecursive(ExecutionState& state) {
// NB: Protects function to be executed no more than limit number of times
if (state.guard.fetch_add(1u) >= state.limit) {
state.latch.count_down();
return;
}
// NB: This simulates critical section
std::this_thread::sleep_for(std::chrono::milliseconds{50});
++state.critical;
// NB: Schedule the new one recursively
state.tp.schedule([&](){ doRecursive(state); });
}
TEST(ThreadPool, ScheduleRecursively)
{
const int kNumThreads = 5u;
const uint32_t kNumTasks = 100u;
ExecutionState state(kNumThreads, kNumTasks);
for (uint32_t i = 0; i < kNumThreads; ++i) {
state.tp.schedule([&](){
doRecursive(state);
});
}
state.latch.wait();
EXPECT_EQ(kNumTasks, state.critical.load());
}
TEST(ThreadPool, ExecutionIsParallel)
{
const uint32_t kNumThreads = 4u;
std::atomic<uint32_t> counter{0};
own::Latch latch{kNumThreads};
own::ThreadPool tp(kNumThreads);
auto start = std::chrono::high_resolution_clock::now();
for (uint32_t i = 0; i < kNumThreads; ++i) {
tp.schedule([&]() {
std::this_thread::sleep_for(std::chrono::milliseconds{800u});
++counter;
latch.count_down();
});
}
latch.wait();
auto end = std::chrono::high_resolution_clock::now();
auto elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count();
EXPECT_GE(1000u, elapsed);
EXPECT_EQ(kNumThreads, counter.load());
}
} // namespace opencv_test
Loading…
Cancel
Save