Merge pull request #19487 from TolyaTalamanov:at/support-nireq-option

[G-API] Support multiple asynchronous requests

* Support nireq option

* Disable tests to check CI

* Fix bug with hanging

* WA to green CI

* Snapshot

* Simplify RequestPool

* Add default values to id

* Fix win warning
pull/19637/head
Anatoliy Talamanov 4 years ago committed by GitHub
parent c7f03814ea
commit 28c064f345
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 19
      modules/gapi/include/opencv2/gapi/infer/ie.hpp
  2. 394
      modules/gapi/src/backends/ie/giebackend.cpp
  3. 47
      modules/gapi/src/backends/ie/giebackend.hpp
  4. 12
      modules/gapi/src/executor/gstreamingexecutor.cpp
  5. 31
      modules/gapi/test/infer/gapi_infer_ie_test.cpp

@ -67,6 +67,9 @@ namespace detail {
Kind kind;
bool is_generic;
IEConfig config;
// NB: Number of asyncrhonious infer requests
size_t nireq;
};
} // namespace detail
@ -91,7 +94,8 @@ public:
, std::tuple_size<typename Net::OutArgs>::value // num_out
, detail::ParamDesc::Kind::Load
, false
, {}} {
, {}
, 1u} {
};
Params(const std::string &model,
@ -101,7 +105,8 @@ public:
, std::tuple_size<typename Net::OutArgs>::value // num_out
, detail::ParamDesc::Kind::Import
, false
, {}} {
, {}
, 1u} {
};
Params<Net>& cfgInputLayers(const typename PortCfg<Net>::In &ll) {
@ -137,6 +142,12 @@ public:
return *this;
}
Params& cfgNumRequests(size_t nireq) {
GAPI_Assert(nireq > 0 && "Number of infer requests must be greater than zero!");
desc.nireq = nireq;
return *this;
}
// BEGIN(G-API's network parametrization API)
GBackend backend() const { return cv::gapi::ie::backend(); }
std::string tag() const { return Net::tag(); }
@ -154,13 +165,13 @@ public:
const std::string &model,
const std::string &weights,
const std::string &device)
: desc{ model, weights, device, {}, {}, {}, 0u, 0u, detail::ParamDesc::Kind::Load, true, {}}, m_tag(tag) {
: desc{ model, weights, device, {}, {}, {}, 0u, 0u, detail::ParamDesc::Kind::Load, true, {}, 1u}, m_tag(tag) {
};
Params(const std::string &tag,
const std::string &model,
const std::string &device)
: desc{ model, {}, device, {}, {}, {}, 0u, 0u, detail::ParamDesc::Kind::Import, true, {}}, m_tag(tag) {
: desc{ model, {}, device, {}, {}, {}, 0u, 0u, detail::ParamDesc::Kind::Import, true, {}, 1u}, m_tag(tag) {
};
Params& pluginConfig(IEConfig&& cfg) {

@ -50,6 +50,14 @@
#include "ie_compound_blob.h"
#endif
#if defined(HAVE_TBB)
# include <tbb/concurrent_queue.h> // FIXME: drop it from here!
template<typename T> using QueueClass = tbb::concurrent_bounded_queue<T>;
#else
# include "executor/conc_queue.hpp"
template<typename T> using QueueClass = cv::gapi::own::concurrent_bounded_queue<T>;
#endif // TBB
namespace IE = InferenceEngine;
namespace {
@ -254,17 +262,7 @@ struct IEUnit {
non_const_this->this_network = cv::gimpl::ie::wrap::loadNetwork(non_const_this->this_plugin, net, params);
}
auto this_request = non_const_this->this_network.CreateInferRequest();
// Bind const data to infer request
for (auto &&p : params.const_inputs) {
// FIXME: SetBlob is known to be inefficient,
// it is worth to make a customizable "initializer" and pass the
// cv::Mat-wrapped blob there to support IE's optimal "GetBlob idiom"
// Still, constant data is to set only once.
this_request.SetBlob(p.first, wrapIE(p.second.first, p.second.second));
}
return {this_plugin, this_network, this_request};
return {params, this_plugin, this_network};
}
};
@ -273,7 +271,6 @@ class IECallContext
public:
IECallContext(const IEUnit & unit,
cv::gimpl::GIslandExecutable::IOutput & output,
cv::gimpl::ie::SyncPrim & sync,
const cv::GArgs & args,
const std::vector<cv::gimpl::RcDesc> & outs,
std::vector<cv::gimpl::GIslandExecutable::InObj> && input_objs,
@ -302,7 +299,6 @@ public:
const IEUnit &uu;
cv::gimpl::GIslandExecutable::IOutput &out;
cv::gimpl::ie::SyncPrim &sync;
// NB: Need to gurantee that MediaFrame::View don't die until request is over.
using Views = std::vector<std::unique_ptr<cv::MediaFrame::View>>;
@ -333,13 +329,11 @@ private:
IECallContext::IECallContext(const IEUnit & unit,
cv::gimpl::GIslandExecutable::IOutput & output,
cv::gimpl::ie::SyncPrim & syncp,
const cv::GArgs & args,
const std::vector<cv::gimpl::RcDesc> & outs,
std::vector<cv::gimpl::GIslandExecutable::InObj> && input_objs,
std::vector<cv::gimpl::GIslandExecutable::OutObj> && output_objs)
: uu(unit), out(output), sync(syncp), m_input_objs(std::move(input_objs)),
m_output_objs(std::move(output_objs))
: uu(unit), out(output), m_input_objs(std::move(input_objs)), m_output_objs(std::move(output_objs))
{
for (auto& it : m_input_objs) cv::gimpl::magazine::bindInArg (m_res, it.first, it.second);
for (auto& it : m_output_objs) cv::gimpl::magazine::bindOutArg(m_res, it.first, it.second);
@ -429,7 +423,7 @@ cv::GArg IECallContext::packArg(const cv::GArg &arg) {
struct IECallable {
static const char *name() { return "IERequestCallable"; }
using Run = std::function<void(cv::gimpl::ie::IECompiled&, std::shared_ptr<IECallContext>)>;
using Run = std::function<void(std::shared_ptr<IECallContext>, cv::gimpl::ie::RequestPool&)>;
Run run;
};
@ -480,6 +474,97 @@ inline IE::Blob::Ptr extractBlob(IECallContext& ctx, std::size_t i) {
}
} // anonymous namespace
std::vector<InferenceEngine::InferRequest> cv::gimpl::ie::IECompiled::createInferRequests() {
std::vector<InferenceEngine::InferRequest> requests;
requests.reserve(params.nireq);
for (size_t i = 0; i < params.nireq; ++i) {
requests.push_back(this_network.CreateInferRequest());
auto& request = requests.back();
// Bind const data to infer request
for (auto &&p : params.const_inputs) {
// FIXME: SetBlob is known to be inefficient,
// it is worth to make a customizable "initializer" and pass the
// cv::Mat-wrapped blob there to support IE's optimal "GetBlob idiom"
// Still, constant data is to set only once.
request.SetBlob(p.first, wrapIE(p.second.first, p.second.second));
}
}
return requests;
}
class cv::gimpl::ie::RequestPool {
public:
using RunF = std::function<void(InferenceEngine::InferRequest&)>;
using CallbackF = std::function<void(InferenceEngine::InferRequest&)>;
// NB: The task is represented by:
// RunF - function which is set blobs and run async inference.
// CallbackF - function which is obtain output blobs and post it to output.
struct Task {
RunF run;
CallbackF callback;
};
explicit RequestPool(std::vector<InferenceEngine::InferRequest>&& requests);
void execute(Task&& t, bool async = true);
void waitAndShutdown();
private:
void callback(Task task, InferenceEngine::InferRequest& request, size_t id);
QueueClass<size_t> m_idle_ids;
std::vector<InferenceEngine::InferRequest> m_requests;
};
// RequestPool implementation //////////////////////////////////////////////
cv::gimpl::ie::RequestPool::RequestPool(std::vector<InferenceEngine::InferRequest>&& requests)
: m_requests(std::move(requests)) {
for (size_t i = 0; i < m_requests.size(); ++i) {
m_idle_ids.push(i);
}
}
void cv::gimpl::ie::RequestPool::execute(cv::gimpl::ie::RequestPool::Task&& t, bool async) {
size_t id = 0u;
m_idle_ids.pop(id);
auto& request = m_requests[id];
// FIXME: This WA should be removed after supporting async mode for InferList and Infer2.
// InferList and Infer2 work synchronously without calling callback,
// therefore don't release InferRequest idle id.
if (!async) {
// NB: Synchronous execution.
t.run(request);
// NB: Explicitly call callback to release id.
callback(t, request, id);
return;
}
request.SetCompletionCallback(
std::bind(&cv::gimpl::ie::RequestPool::callback, this, t, std::ref(request), id));
t.run(request);
}
void cv::gimpl::ie::RequestPool::callback(cv::gimpl::ie::RequestPool::Task task,
InferenceEngine::InferRequest& request,
size_t id) {
task.callback(request);
m_idle_ids.push(id);
}
// NB: Not thread-safe.
void cv::gimpl::ie::RequestPool::waitAndShutdown() {
// NB: It will be blocked if at least one request is busy.
for (size_t i = 0; i < m_requests.size(); ++i) {
size_t id = 0u;
m_idle_ids.pop(id);
}
}
// GCPUExcecutable implementation //////////////////////////////////////////////
cv::gimpl::ie::GIEExecutable::GIEExecutable(const ade::Graph &g,
const std::vector<ade::NodeHandle> &nodes)
@ -494,6 +579,7 @@ cv::gimpl::ie::GIEExecutable::GIEExecutable(const ade::Graph &g,
if (this_nh == nullptr) {
this_nh = nh;
this_iec = iem.metadata(this_nh).get<IEUnit>().compile();
m_reqPool.reset(new RequestPool(this_iec.createInferRequests()));
}
else
util::throw_error(std::logic_error("Multi-node inference is not supported!"));
@ -518,27 +604,26 @@ cv::gimpl::ie::GIEExecutable::GIEExecutable(const ade::Graph &g,
void cv::gimpl::ie::GIEExecutable::run(cv::gimpl::GIslandExecutable::IInput &in,
cv::gimpl::GIslandExecutable::IOutput &out) {
// General alghoritm:
// 1. Since only single async request is supported
// wait until it is over and start collecting new data.
// 2. Collect island inputs/outputs.
// 3. Create kernel context. (Every kernel has his own context.)
// 4. Go to the next frame without waiting until the async request is over (1)
// 1. Collect island inputs/outputs.
// 2. Create kernel context. (Every kernel has his own context).
// 3. If the EndOfStream message is recieved, wait until all passed task are done.
// 4.
// 5.1 Run the kernel.
// 5.2 Kernel wait for all nececcary infer requests and start asynchronous execution.
// 5.3 After the kernel is finished continue processing next frame.
//
// 5. If graph is compiled in non-streaming mode, wait until request is over.
// (1) To prevent data race on the IOutput object, need to wait
// for async request callback, which post outputs and only after that get new data.
m_sync.wait();
// 5. If graph is compiled in non-streaming mode, wait until all tasks are done.
std::vector<InObj> input_objs;
std::vector<OutObj> output_objs;
const auto &in_desc = in.desc();
const auto &out_desc = out.desc();
const auto in_msg = in.get();
if (cv::util::holds_alternative<cv::gimpl::EndOfStream>(in_msg))
{
// (3) Wait until all passed task are done.
m_reqPool->waitAndShutdown();
out.post(cv::gimpl::EndOfStream{});
return;
}
@ -546,14 +631,16 @@ void cv::gimpl::ie::GIEExecutable::run(cv::gimpl::GIslandExecutable::IInput &in
GAPI_Assert(cv::util::holds_alternative<cv::GRunArgs>(in_msg));
const auto in_vector = cv::util::get<cv::GRunArgs>(in_msg);
// (2) Collect inputs/outputs
// (1) Collect island inputs/outputs
input_objs.reserve(in_desc.size());
output_objs.reserve(out_desc.size());
for (auto &&it: ade::util::zip(ade::util::toRange(in_desc),
ade::util::toRange(in_vector)))
{
input_objs.emplace_back(std::get<0>(it), std::get<1>(it));
}
const auto &out_desc = out.desc();
output_objs.reserve(out_desc.size());
for (auto &&it: ade::util::indexed(ade::util::toRange(out_desc)))
{
output_objs.emplace_back(ade::util::value(it),
@ -563,22 +650,19 @@ void cv::gimpl::ie::GIEExecutable::run(cv::gimpl::GIslandExecutable::IInput &in
GConstGIEModel giem(m_g);
const auto &uu = giem.metadata(this_nh).get<IEUnit>();
const auto &op = m_gm.metadata(this_nh).get<Op>();
// (3) Create kernel context
auto context = std::make_shared<IECallContext>(uu, out, m_sync, op.args, op.outs,
// (2) Create kernel context
auto ctx = std::make_shared<IECallContext>(uu, out, op.args, op.outs,
std::move(input_objs), std::move(output_objs));
// (5) Run the kernel and start handle next frame.
const auto &kk = giem.metadata(this_nh).get<IECallable>();
// FIXME: Running just a single node now.
// Not sure if need to support many of them, though
// FIXME: Make this island-unmergeable?
kk.run(this_iec, context);
// (6) In not-streaming mode need to wait until the async request is over
// (4) Run the kernel.
kk.run(ctx, *m_reqPool);
// (5) In non-streaming mode need to wait until the all tasks are done
// FIXME: Is there more graceful way to handle this case ?
if (!m_gm.metadata().contains<Streaming>()) {
m_sync.wait();
m_reqPool->waitAndShutdown();
}
}
@ -616,54 +700,16 @@ static void configureInputInfo(const IE::InputInfo::Ptr& ii, const cv::GMetaArg
// NB: This is a callback used by async infer
// to post outputs blobs (cv::GMat's).
struct PostOutputs {
// NB: Should be const to pass into SetCompletionCallback
void operator()() const {
for (auto i : ade::util::iota(ctx->uu.params.num_out)) {
static void PostOutputs(InferenceEngine::InferRequest &request,
std::shared_ptr<IECallContext> ctx) {
for (auto i : ade::util::iota(ctx->uu.params.num_out))
{
auto& out_mat = ctx->outMatR(i);
IE::Blob::Ptr this_blob = iec.this_request.GetBlob(ctx->uu.params.output_names[i]);
IE::Blob::Ptr this_blob = request.GetBlob(ctx->uu.params.output_names[i]);
copyFromIE(this_blob, out_mat);
ctx->out.post(ctx->output(i));
}
ctx->sync.release_and_notify();
}
IECompiled &iec ;
std::shared_ptr<IECallContext> ctx ;
};
// NB: This is a callback used by async infer
// to post output list of blobs (cv::GArray<cv::GMat>).
struct PostOutputsList {
// NB: Should be const to pass into SetCompletionCallback
void operator()() const {
for (auto i : ade::util::iota(ctx->uu.params.num_out)) {
std::vector<cv::Mat> &out_vec = ctx->outVecR<cv::Mat>(i);
IE::Blob::Ptr out_blob = iec.this_request.GetBlob(ctx->uu.params.output_names[i]);
cv::Mat out_mat(cached_dims[i], toCV(out_blob->getTensorDesc().getPrecision()));
// FIXME: Avoid data copy. Not sure if it is possible though
copyFromIE(out_blob, out_mat);
out_vec.push_back(std::move(out_mat));
}
// NB: Callbacks run synchronously yet, so the lock isn't necessary
auto&& out_vec_size = ctx->outVecR<cv::Mat>(0).size();
// NB: Now output vector is collected and can be posted to output
if (nrequests == out_vec_size) {
for (auto i : ade::util::iota(ctx->uu.params.num_out)) {
ctx->out.post(ctx->output(i));
}
}
ctx->sync.release_and_notify();
}
IECompiled &iec ;
std::shared_ptr<IECallContext> ctx ;
std::vector< std::vector<int> > cached_dims;
size_t nrequests;
};
}
struct Infer: public cv::detail::KernelTag {
using API = cv::GInferBase;
@ -715,7 +761,12 @@ struct Infer: public cv::detail::KernelTag {
return result;
}
static void run(IECompiled &iec, std::shared_ptr<IECallContext> ctx) {
static void run(std::shared_ptr<IECallContext> ctx,
cv::gimpl::ie::RequestPool &reqPool) {
using namespace std::placeholders;
reqPool.execute(
cv::gimpl::ie::RequestPool::Task {
[ctx](InferenceEngine::InferRequest &req) {
// non-generic version for now:
// - assumes all inputs/outputs are always Mats
for (auto i : ade::util::iota(ctx->uu.params.num_in)) {
@ -723,15 +774,15 @@ struct Infer: public cv::detail::KernelTag {
// and redirect our data producers to this memory
// (A memory dialog comes to the picture again)
IE::Blob::Ptr this_blob = extractBlob(*ctx, i);
iec.this_request.SetBlob(ctx->uu.params.input_names[i], this_blob);
req.SetBlob(ctx->uu.params.input_names[i], this_blob);
}
iec.this_request.SetCompletionCallback(PostOutputs{iec, ctx});
// NB: Since only single async request is supported, need to lock other
// attempts to get access while request is working.
ctx->sync.acquire();
iec.this_request.StartAsync();
// FIXME: Should it be done by kernel ?
// What about to do that in RequestPool ?
req.StartAsync();
},
std::bind(PostOutputs, _1, ctx)
}
);
}
};
@ -776,22 +827,27 @@ struct InferROI: public cv::detail::KernelTag {
return result;
}
static void run(IECompiled &iec, std::shared_ptr<IECallContext> ctx) {
// non-generic version for now, per the InferROI's definition
static void run(std::shared_ptr<IECallContext> ctx,
cv::gimpl::ie::RequestPool &reqPool) {
using namespace std::placeholders;
reqPool.execute(
cv::gimpl::ie::RequestPool::Task {
[ctx](InferenceEngine::InferRequest &req) {
GAPI_Assert(ctx->uu.params.num_in == 1);
const auto& this_roi = ctx->inArg<cv::detail::OpaqueRef>(0).rref<cv::Rect>();
auto&& this_roi = ctx->inArg<cv::detail::OpaqueRef>(0).rref<cv::Rect>();
IE::Blob::Ptr this_blob = extractBlob(*ctx, 1u);
IE::Blob::Ptr this_blob = extractBlob(*ctx, 1);
iec.this_request.SetBlob(*(ctx->uu.params.input_names.begin()),
req.SetBlob(*(ctx->uu.params.input_names.begin()),
IE::make_shared_blob(this_blob, toIE(this_roi)));
iec.this_request.SetCompletionCallback(PostOutputs{iec, ctx});
// NB: Since only single async request is supported, need to lock other
// attempts to get access while request is working.
ctx->sync.acquire();
iec.this_request.StartAsync();
// FIXME: Should it be done by kernel ?
// What about to do that in RequestPool ?
req.StartAsync();
},
std::bind(PostOutputs, _1, ctx)
}
);
}
};
@ -834,53 +890,64 @@ struct InferList: public cv::detail::KernelTag {
cv::GMetaArg{cv::empty_array_desc()});
}
static void run(IECompiled &iec, std::shared_ptr<IECallContext> ctx) {
static void run(std::shared_ptr<IECallContext> ctx,
cv::gimpl::ie::RequestPool &reqPool) {
using namespace std::placeholders;
reqPool.execute(
cv::gimpl::ie::RequestPool::Task {
[ctx](InferenceEngine::InferRequest &req) {
// non-generic version for now:
// - assumes zero input is always ROI list
// - assumes all inputs/outputs are always Mats
GAPI_Assert(ctx->uu.params.num_in == 1); // roi list is not counted in net's inputs
const auto& in_roi_vec = ctx->inArg<cv::detail::VectorRef>(0u).rref<cv::Rect>();
// NB: In case there is no input data need to post output anyway
if (in_roi_vec.empty()) {
for (auto i : ade::util::iota(ctx->uu.params.num_out)) {
ctx->out.post(ctx->output(i));
}
return;
}
IE::Blob::Ptr this_blob = extractBlob(*ctx, 1u);
IE::Blob::Ptr this_blob = extractBlob(*ctx, 1);
// FIXME: This could be done ONCE at graph compile stage!
std::vector< std::vector<int> > cached_dims(ctx->uu.params.num_out);
std::vector<std::vector<int>> cached_dims(ctx->uu.params.num_out);
for (auto i : ade::util::iota(ctx->uu.params.num_out)) {
const IE::DataPtr& ie_out = ctx->uu.outputs.at(ctx->uu.params.output_names[i]);
cached_dims[i] = toCV(ie_out->getTensorDesc().getDims());
ctx->outVecR<cv::Mat>(i).clear();
// FIXME: Isn't this should be done automatically
// by some resetInternalData(), etc? (Probably at the GExecutor level)
}
// NB: If list of roi is empty need to post output data anyway.
if (in_roi_vec.empty()) {
for (auto i : ade::util::iota(ctx->uu.params.num_out)) {
ctx->out.post(ctx->output(i));
}
return;
ctx->outVecR<cv::Mat>(i).clear();
}
for (auto&& rc : in_roi_vec) {
// NB: Only single async request is supported now,
// so need to wait until previos iteration is over.
// However there is no need to wait async request from last iteration,
// this will be done by backend.
ctx->sync.wait();
IE::Blob::Ptr roi_blob = IE::make_shared_blob(this_blob, toIE(rc));
iec.this_request.SetBlob(ctx->uu.params.input_names[0u], roi_blob);
req.SetBlob(ctx->uu.params.input_names[0u], roi_blob);
req.Infer();
for (auto i : ade::util::iota(ctx->uu.params.num_out)) {
std::vector<cv::Mat> &out_vec = ctx->outVecR<cv::Mat>(i);
iec.this_request.SetCompletionCallback(
PostOutputsList{iec, ctx, cached_dims, in_roi_vec.size()});
IE::Blob::Ptr out_blob = req.GetBlob(ctx->uu.params.output_names[i]);
// NB: Since only single async request is supported, need to lock other
// attempts to get access while request is working.
ctx->sync.acquire();
iec.this_request.StartAsync();
cv::Mat out_mat(cached_dims[i], toCV(out_blob->getTensorDesc().getPrecision()));
// FIXME: Avoid data copy. Not sure if it is possible though
copyFromIE(out_blob, out_mat);
out_vec.push_back(std::move(out_mat));
}
}
for (auto i : ade::util::iota(ctx->uu.params.num_out)) {
ctx->out.post(ctx->output(i));
}
},
[](InferenceEngine::InferRequest &) { /* do nothing */ }
},
false /* not async */
);
}
};
struct InferList2: public cv::detail::KernelTag {
@ -966,41 +1033,42 @@ struct InferList2: public cv::detail::KernelTag {
cv::GMetaArg{cv::empty_array_desc()});
}
static void run(IECompiled &iec, std::shared_ptr<IECallContext> ctx) {
static void run(std::shared_ptr<IECallContext> ctx,
cv::gimpl::ie::RequestPool &reqPool) {
reqPool.execute(
cv::gimpl::ie::RequestPool::Task {
[ctx](InferenceEngine::InferRequest &req) {
GAPI_Assert(ctx->inArgs().size() > 1u
&& "This operation must have at least two arguments");
IE::Blob::Ptr blob_0 = extractBlob(*ctx, 0u);
IE::Blob::Ptr blob_0 = extractBlob(*ctx, 0);
// Take the next argument, which must be vector (of any kind).
// Use it only to obtain the ROI list size (sizes of all other
// vectors must be equal to this one)
const auto list_size = ctx->inArg<cv::detail::VectorRef>(1u).size();
if (list_size == 0u) {
for (auto i : ade::util::iota(ctx->uu.params.num_out)) {
ctx->out.post(ctx->output(i));
}
return;
}
for (auto i : ade::util::iota(ctx->uu.params.num_out)) {
ctx->outVecR<cv::Mat>(i).resize(list_size);
}
// FIXME: This could be done ONCE at graph compile stage!
std::vector< std::vector<int> > cached_dims(ctx->uu.params.num_out);
for (auto i : ade::util::iota(ctx->uu.params.num_out)) {
const IE::DataPtr& ie_out = ctx->uu.outputs.at(ctx->uu.params.output_names[i]);
cached_dims[i] = toCV(ie_out->getTensorDesc().getDims());
ctx->outVecR<cv::Mat>(i).clear();
// FIXME: Isn't this should be done automatically
// by some resetInternalData(), etc? (Probably at the GExecutor level)
}
// NB: If list of roi is empty need to post output data anyway.
if (list_size == 0u) {
for (auto i : ade::util::iota(ctx->uu.params.num_out)) {
ctx->out.post(ctx->output(i));
}
return;
ctx->outVecR<cv::Mat>(i).clear();
}
for (const auto &list_idx : ade::util::iota(list_size)) {
// NB: Only single async request is supported now,
// so need to wait until previos iteration is over.
// However there is no need to wait async request from last iteration,
// this will be done by backend.
ctx->sync.wait();
for (auto in_idx : ade::util::iota(ctx->uu.params.num_in)) {
const auto &this_vec = ctx->inArg<cv::detail::VectorRef>(in_idx+1u);
GAPI_Assert(this_vec.size() == list_size);
@ -1013,19 +1081,35 @@ struct InferList2: public cv::detail::KernelTag {
const auto &mat = vec[list_idx];
this_blob = wrapIE(mat, cv::gapi::ie::TraitAs::TENSOR);
} else {
GAPI_Assert(false && "Only Rect and Mat types are supported for infer list 2!");
GAPI_Assert(false &&
"Only Rect and Mat types are supported for infer list 2!");
}
iec.this_request.SetBlob(ctx->uu.params.input_names[in_idx], this_blob);
req.SetBlob(ctx->uu.params.input_names[in_idx], this_blob);
}
iec.this_request.SetCompletionCallback(
PostOutputsList{iec, ctx, cached_dims, list_size});
req.Infer();
for (auto i : ade::util::iota(ctx->uu.params.num_out)) {
std::vector<cv::Mat> &out_vec = ctx->outVecR<cv::Mat>(i);
IE::Blob::Ptr out_blob = req.GetBlob(ctx->uu.params.output_names[i]);
// NB: Since only single async request is supported, need to lock other
// attempts to get access while request is working.
ctx->sync.acquire();
iec.this_request.StartAsync();
cv::Mat out_mat(cached_dims[i], toCV(out_blob->getTensorDesc().getPrecision()));
// FIXME: Avoid data copy. Not sure if it is possible though
copyFromIE(out_blob, out_mat);
out_vec.push_back(std::move(out_mat));
}
}
for (auto i : ade::util::iota(ctx->uu.params.num_out)) {
ctx->out.post(ctx->output(i));
}
},
[](InferenceEngine::InferRequest &) { /* do nothing */ }
},
false /* not async */
);
}
};

@ -24,50 +24,21 @@
#include "backends/common/gbackend.hpp"
#include "compiler/gislandmodel.hpp"
#include "backends/ie/giebackend/giewrapper.hpp" // wrap::Plugin
namespace cv {
namespace gimpl {
namespace ie {
struct IECompiled {
#if INF_ENGINE_RELEASE < 2019020000 // < 2019.R2
InferenceEngine::InferencePlugin this_plugin;
#else
InferenceEngine::Core this_core;
#endif
std::vector<InferenceEngine::InferRequest> createInferRequests();
cv::gapi::ie::detail::ParamDesc params;
cv::gimpl::ie::wrap::Plugin this_plugin;
InferenceEngine::ExecutableNetwork this_network;
InferenceEngine::InferRequest this_request;
};
// FIXME: Structure which collect all necessary sync primitives
// will be deleted when the async request pool appears
class SyncPrim {
public:
void wait() {
std::unique_lock<std::mutex> l(m_mutex);
m_cv.wait(l, [this]{ return !m_is_busy; });
}
void release_and_notify() {
{
std::lock_guard<std::mutex> lock(m_mutex);
m_is_busy = false;
}
m_cv.notify_one();
}
void acquire() {
std::lock_guard<std::mutex> lock(m_mutex);
m_is_busy = true;
}
private:
// To wait until the async request isn't over
std::condition_variable m_cv;
// To avoid spurious cond var wake up
bool m_is_busy = false;
// To sleep until condition variable wakes up
std::mutex m_mutex;
};
class RequestPool;
class GIEExecutable final: public GIslandExecutable
{
@ -82,8 +53,8 @@ class GIEExecutable final: public GIslandExecutable
// List of all resources in graph (both internal and external)
std::vector<ade::NodeHandle> m_dataNodes;
// Sync primitive
SyncPrim m_sync;
// To manage multiple async requests
std::unique_ptr<RequestPool> m_reqPool;
public:
GIEExecutable(const ade::Graph &graph,

@ -7,7 +7,6 @@
#include "precomp.hpp"
#include <memory> // make_shared
#include <iostream>
#include <ade/util/zip_range.hpp>
@ -583,10 +582,16 @@ class StreamingOutput final: public cv::gimpl::GIslandExecutable::IOutput
std::vector< std::vector<Q*> > &m_out_queues;
std::shared_ptr<cv::gimpl::GIslandExecutable> m_island;
// NB: StreamingOutput have to be thread-safe.
// Now synchronization approach is quite poor and inefficient.
mutable std::mutex m_mutex;
// Allocate a new data object for output under idx
// Prepare this object for posting
virtual cv::GRunArgP get(int idx) override
{
std::lock_guard<std::mutex> lock{m_mutex};
using MatType = cv::Mat;
using SclType = cv::Scalar;
@ -663,6 +668,8 @@ class StreamingOutput final: public cv::gimpl::GIslandExecutable::IOutput
}
virtual void post(cv::GRunArgP&& argp) override
{
std::lock_guard<std::mutex> lock{m_mutex};
// Mark the output ready for posting. If it is the first in the line,
// actually post it and all its successors which are ready for posting too.
auto it = m_postIdx.find(cv::gimpl::proto::ptr(argp));
@ -700,6 +707,7 @@ class StreamingOutput final: public cv::gimpl::GIslandExecutable::IOutput
}
virtual void post(cv::gimpl::EndOfStream&&) override
{
std::lock_guard<std::mutex> lock{m_mutex};
// If the posting list is empty, just broadcast the stop message.
// If it is not, enqueue the Stop message in the postings list.
for (auto &&it : ade::util::indexed(m_postings))
@ -725,6 +733,7 @@ class StreamingOutput final: public cv::gimpl::GIslandExecutable::IOutput
}
void meta(const cv::GRunArgP &out, const cv::GRunArg::Meta &m) override
{
std::lock_guard<std::mutex> lock{m_mutex};
const auto it = m_postIdx.find(cv::gimpl::proto::ptr(out));
GAPI_Assert(it != m_postIdx.end());
@ -747,6 +756,7 @@ public:
bool done() const
{
std::lock_guard<std::mutex> lock{m_mutex};
// The streaming actor work is considered DONE for this stream
// when it posted/resent all STOP messages to all its outputs.
return m_stops_sent == desc().size();

@ -953,6 +953,16 @@ TEST_F(ROIListNV12, Infer2MediaInputNV12)
validate();
}
TEST(Infer, SetInvalidNumberOfRequests)
{
using AGInfo = std::tuple<cv::GMat, cv::GMat>;
G_API_NET(AgeGender, <AGInfo(cv::GMat)>, "test-age-gender");
cv::gapi::ie::Params<AgeGender> pp{"model", "weights", "device"};
EXPECT_ANY_THROW(pp.cfgNumRequests(0u));
}
TEST(Infer, TestStreamingInfer)
{
initTestDataPath();
@ -980,7 +990,8 @@ TEST(Infer, TestStreamingInfer)
auto pp = cv::gapi::ie::Params<AgeGender> {
params.model_path, params.weights_path, params.device_id
}.cfgOutputLayers({ "age_conv3", "prob" });
}.cfgOutputLayers({ "age_conv3", "prob" })
.cfgNumRequests(4u);
std::size_t num_frames = 0u;
@ -1049,7 +1060,8 @@ TEST(InferROI, TestStreamingInfer)
auto pp = cv::gapi::ie::Params<AgeGender> {
params.model_path, params.weights_path, params.device_id
}.cfgOutputLayers({ "age_conv3", "prob" });
}.cfgOutputLayers({ "age_conv3", "prob" })
.cfgNumRequests(4u);
std::size_t num_frames = 0u;
@ -1131,7 +1143,8 @@ TEST(InferList, TestStreamingInfer)
auto pp = cv::gapi::ie::Params<AgeGender> {
params.model_path, params.weights_path, params.device_id
}.cfgOutputLayers({ "age_conv3", "prob" });
}.cfgOutputLayers({ "age_conv3", "prob" })
.cfgNumRequests(4u);
std::size_t num_frames = 0u;
@ -1222,8 +1235,8 @@ TEST(Infer2, TestStreamingInfer)
auto pp = cv::gapi::ie::Params<AgeGender> {
params.model_path, params.weights_path, params.device_id
}.cfgOutputLayers({ "age_conv3", "prob" });
}.cfgOutputLayers({ "age_conv3", "prob" })
.cfgNumRequests(4u);
std::size_t num_frames = 0u;
std::size_t max_frames = 10u;
@ -1311,8 +1324,8 @@ TEST(InferEmptyList, TestStreamingInfer)
auto pp = cv::gapi::ie::Params<AgeGender> {
params.model_path, params.weights_path, params.device_id
}.cfgOutputLayers({ "age_conv3", "prob" });
}.cfgOutputLayers({ "age_conv3", "prob" })
.cfgNumRequests(4u);
std::size_t num_frames = 0u;
std::size_t max_frames = 1u;
@ -1366,8 +1379,8 @@ TEST(Infer2EmptyList, TestStreamingInfer)
auto pp = cv::gapi::ie::Params<AgeGender> {
params.model_path, params.weights_path, params.device_id
}.cfgOutputLayers({ "age_conv3", "prob" });
}.cfgOutputLayers({ "age_conv3", "prob" })
.cfgNumRequests(4u);
std::size_t num_frames = 0u;
std::size_t max_frames = 1u;

Loading…
Cancel
Save