From 28c064f3459debe8707eb6d2817a080f442e8e7c Mon Sep 17 00:00:00 2001 From: Anatoliy Talamanov Date: Fri, 26 Feb 2021 15:53:30 +0300 Subject: [PATCH] Merge pull request #19487 from TolyaTalamanov:at/support-nireq-option [G-API] Support multiple asynchronous requests * Support nireq option * Disable tests to check CI * Fix bug with hanging * WA to green CI * Snapshot * Simplify RequestPool * Add default values to id * Fix win warning --- .../gapi/include/opencv2/gapi/infer/ie.hpp | 19 +- modules/gapi/src/backends/ie/giebackend.cpp | 552 ++++++++++-------- modules/gapi/src/backends/ie/giebackend.hpp | 49 +- .../gapi/src/executor/gstreamingexecutor.cpp | 12 +- .../gapi/test/infer/gapi_infer_ie_test.cpp | 31 +- 5 files changed, 376 insertions(+), 287 deletions(-) diff --git a/modules/gapi/include/opencv2/gapi/infer/ie.hpp b/modules/gapi/include/opencv2/gapi/infer/ie.hpp index 53e31fbb09..e1df80f1d0 100644 --- a/modules/gapi/include/opencv2/gapi/infer/ie.hpp +++ b/modules/gapi/include/opencv2/gapi/infer/ie.hpp @@ -67,6 +67,9 @@ namespace detail { Kind kind; bool is_generic; IEConfig config; + + // NB: Number of asyncrhonious infer requests + size_t nireq; }; } // namespace detail @@ -91,7 +94,8 @@ public: , std::tuple_size::value // num_out , detail::ParamDesc::Kind::Load , false - , {}} { + , {} + , 1u} { }; Params(const std::string &model, @@ -101,7 +105,8 @@ public: , std::tuple_size::value // num_out , detail::ParamDesc::Kind::Import , false - , {}} { + , {} + , 1u} { }; Params& cfgInputLayers(const typename PortCfg::In &ll) { @@ -137,6 +142,12 @@ public: return *this; } + Params& cfgNumRequests(size_t nireq) { + GAPI_Assert(nireq > 0 && "Number of infer requests must be greater than zero!"); + desc.nireq = nireq; + return *this; + } + // BEGIN(G-API's network parametrization API) GBackend backend() const { return cv::gapi::ie::backend(); } std::string tag() const { return Net::tag(); } @@ -154,13 +165,13 @@ public: const std::string &model, const std::string &weights, const std::string &device) - : desc{ model, weights, device, {}, {}, {}, 0u, 0u, detail::ParamDesc::Kind::Load, true, {}}, m_tag(tag) { + : desc{ model, weights, device, {}, {}, {}, 0u, 0u, detail::ParamDesc::Kind::Load, true, {}, 1u}, m_tag(tag) { }; Params(const std::string &tag, const std::string &model, const std::string &device) - : desc{ model, {}, device, {}, {}, {}, 0u, 0u, detail::ParamDesc::Kind::Import, true, {}}, m_tag(tag) { + : desc{ model, {}, device, {}, {}, {}, 0u, 0u, detail::ParamDesc::Kind::Import, true, {}, 1u}, m_tag(tag) { }; Params& pluginConfig(IEConfig&& cfg) { diff --git a/modules/gapi/src/backends/ie/giebackend.cpp b/modules/gapi/src/backends/ie/giebackend.cpp index 949c803270..79997820fd 100644 --- a/modules/gapi/src/backends/ie/giebackend.cpp +++ b/modules/gapi/src/backends/ie/giebackend.cpp @@ -50,6 +50,14 @@ #include "ie_compound_blob.h" #endif +#if defined(HAVE_TBB) +# include // FIXME: drop it from here! +template using QueueClass = tbb::concurrent_bounded_queue; +#else +# include "executor/conc_queue.hpp" +template using QueueClass = cv::gapi::own::concurrent_bounded_queue; +#endif // TBB + namespace IE = InferenceEngine; namespace { @@ -254,17 +262,7 @@ struct IEUnit { non_const_this->this_network = cv::gimpl::ie::wrap::loadNetwork(non_const_this->this_plugin, net, params); } - auto this_request = non_const_this->this_network.CreateInferRequest(); - // Bind const data to infer request - for (auto &&p : params.const_inputs) { - // FIXME: SetBlob is known to be inefficient, - // it is worth to make a customizable "initializer" and pass the - // cv::Mat-wrapped blob there to support IE's optimal "GetBlob idiom" - // Still, constant data is to set only once. - this_request.SetBlob(p.first, wrapIE(p.second.first, p.second.second)); - } - - return {this_plugin, this_network, this_request}; + return {params, this_plugin, this_network}; } }; @@ -273,7 +271,6 @@ class IECallContext public: IECallContext(const IEUnit & unit, cv::gimpl::GIslandExecutable::IOutput & output, - cv::gimpl::ie::SyncPrim & sync, const cv::GArgs & args, const std::vector & outs, std::vector && input_objs, @@ -302,7 +299,6 @@ public: const IEUnit &uu; cv::gimpl::GIslandExecutable::IOutput &out; - cv::gimpl::ie::SyncPrim &sync; // NB: Need to gurantee that MediaFrame::View don't die until request is over. using Views = std::vector>; @@ -333,13 +329,11 @@ private: IECallContext::IECallContext(const IEUnit & unit, cv::gimpl::GIslandExecutable::IOutput & output, - cv::gimpl::ie::SyncPrim & syncp, const cv::GArgs & args, const std::vector & outs, std::vector && input_objs, std::vector && output_objs) -: uu(unit), out(output), sync(syncp), m_input_objs(std::move(input_objs)), - m_output_objs(std::move(output_objs)) +: uu(unit), out(output), m_input_objs(std::move(input_objs)), m_output_objs(std::move(output_objs)) { for (auto& it : m_input_objs) cv::gimpl::magazine::bindInArg (m_res, it.first, it.second); for (auto& it : m_output_objs) cv::gimpl::magazine::bindOutArg(m_res, it.first, it.second); @@ -355,12 +349,12 @@ IECallContext::IECallContext(const IEUnit & return arg.get().shape; }); - for (const auto out_it : ade::util::indexed(outs)) { - // FIXME: Can the same GArg type resolution mechanism be reused here? - const auto port = ade::util::index(out_it); - const auto desc = ade::util::value(out_it); - m_results[port] = cv::gimpl::magazine::getObjPtr(m_res, desc); - } + for (const auto out_it : ade::util::indexed(outs)) { + // FIXME: Can the same GArg type resolution mechanism be reused here? + const auto port = ade::util::index(out_it); + const auto desc = ade::util::value(out_it); + m_results[port] = cv::gimpl::magazine::getObjPtr(m_res, desc); + } } const cv::GArgs& IECallContext::inArgs() const { @@ -429,7 +423,7 @@ cv::GArg IECallContext::packArg(const cv::GArg &arg) { struct IECallable { static const char *name() { return "IERequestCallable"; } - using Run = std::function)>; + using Run = std::function, cv::gimpl::ie::RequestPool&)>; Run run; }; @@ -480,6 +474,97 @@ inline IE::Blob::Ptr extractBlob(IECallContext& ctx, std::size_t i) { } } // anonymous namespace +std::vector cv::gimpl::ie::IECompiled::createInferRequests() { + std::vector requests; + requests.reserve(params.nireq); + + for (size_t i = 0; i < params.nireq; ++i) { + requests.push_back(this_network.CreateInferRequest()); + auto& request = requests.back(); + // Bind const data to infer request + for (auto &&p : params.const_inputs) { + // FIXME: SetBlob is known to be inefficient, + // it is worth to make a customizable "initializer" and pass the + // cv::Mat-wrapped blob there to support IE's optimal "GetBlob idiom" + // Still, constant data is to set only once. + request.SetBlob(p.first, wrapIE(p.second.first, p.second.second)); + } + } + + return requests; +} + +class cv::gimpl::ie::RequestPool { +public: + using RunF = std::function; + using CallbackF = std::function; + + // NB: The task is represented by: + // RunF - function which is set blobs and run async inference. + // CallbackF - function which is obtain output blobs and post it to output. + struct Task { + RunF run; + CallbackF callback; + }; + + explicit RequestPool(std::vector&& requests); + + void execute(Task&& t, bool async = true); + void waitAndShutdown(); + +private: + void callback(Task task, InferenceEngine::InferRequest& request, size_t id); + + QueueClass m_idle_ids; + std::vector m_requests; +}; + +// RequestPool implementation ////////////////////////////////////////////// +cv::gimpl::ie::RequestPool::RequestPool(std::vector&& requests) + : m_requests(std::move(requests)) { + for (size_t i = 0; i < m_requests.size(); ++i) { + m_idle_ids.push(i); + } + } + +void cv::gimpl::ie::RequestPool::execute(cv::gimpl::ie::RequestPool::Task&& t, bool async) { + size_t id = 0u; + m_idle_ids.pop(id); + + auto& request = m_requests[id]; + + // FIXME: This WA should be removed after supporting async mode for InferList and Infer2. + // InferList and Infer2 work synchronously without calling callback, + // therefore don't release InferRequest idle id. + if (!async) { + // NB: Synchronous execution. + t.run(request); + // NB: Explicitly call callback to release id. + callback(t, request, id); + return; + } + + request.SetCompletionCallback( + std::bind(&cv::gimpl::ie::RequestPool::callback, this, t, std::ref(request), id)); + t.run(request); +} + +void cv::gimpl::ie::RequestPool::callback(cv::gimpl::ie::RequestPool::Task task, + InferenceEngine::InferRequest& request, + size_t id) { + task.callback(request); + m_idle_ids.push(id); +} + +// NB: Not thread-safe. +void cv::gimpl::ie::RequestPool::waitAndShutdown() { + // NB: It will be blocked if at least one request is busy. + for (size_t i = 0; i < m_requests.size(); ++i) { + size_t id = 0u; + m_idle_ids.pop(id); + } +} + // GCPUExcecutable implementation ////////////////////////////////////////////// cv::gimpl::ie::GIEExecutable::GIEExecutable(const ade::Graph &g, const std::vector &nodes) @@ -494,6 +579,7 @@ cv::gimpl::ie::GIEExecutable::GIEExecutable(const ade::Graph &g, if (this_nh == nullptr) { this_nh = nh; this_iec = iem.metadata(this_nh).get().compile(); + m_reqPool.reset(new RequestPool(this_iec.createInferRequests())); } else util::throw_error(std::logic_error("Multi-node inference is not supported!")); @@ -518,27 +604,26 @@ cv::gimpl::ie::GIEExecutable::GIEExecutable(const ade::Graph &g, void cv::gimpl::ie::GIEExecutable::run(cv::gimpl::GIslandExecutable::IInput &in, cv::gimpl::GIslandExecutable::IOutput &out) { // General alghoritm: - // 1. Since only single async request is supported - // wait until it is over and start collecting new data. - // 2. Collect island inputs/outputs. - // 3. Create kernel context. (Every kernel has his own context.) - // 4. Go to the next frame without waiting until the async request is over (1) + // 1. Collect island inputs/outputs. + // 2. Create kernel context. (Every kernel has his own context). + // 3. If the EndOfStream message is recieved, wait until all passed task are done. + // 4. + // 5.1 Run the kernel. + // 5.2 Kernel wait for all nececcary infer requests and start asynchronous execution. + // 5.3 After the kernel is finished continue processing next frame. // - // 5. If graph is compiled in non-streaming mode, wait until request is over. - - // (1) To prevent data race on the IOutput object, need to wait - // for async request callback, which post outputs and only after that get new data. - m_sync.wait(); + // 5. If graph is compiled in non-streaming mode, wait until all tasks are done. std::vector input_objs; std::vector output_objs; const auto &in_desc = in.desc(); - const auto &out_desc = out.desc(); const auto in_msg = in.get(); if (cv::util::holds_alternative(in_msg)) { + // (3) Wait until all passed task are done. + m_reqPool->waitAndShutdown(); out.post(cv::gimpl::EndOfStream{}); return; } @@ -546,39 +631,38 @@ void cv::gimpl::ie::GIEExecutable::run(cv::gimpl::GIslandExecutable::IInput &in GAPI_Assert(cv::util::holds_alternative(in_msg)); const auto in_vector = cv::util::get(in_msg); - // (2) Collect inputs/outputs + // (1) Collect island inputs/outputs input_objs.reserve(in_desc.size()); - output_objs.reserve(out_desc.size()); for (auto &&it: ade::util::zip(ade::util::toRange(in_desc), - ade::util::toRange(in_vector))) + ade::util::toRange(in_vector))) { input_objs.emplace_back(std::get<0>(it), std::get<1>(it)); } + + const auto &out_desc = out.desc(); + output_objs.reserve(out_desc.size()); for (auto &&it: ade::util::indexed(ade::util::toRange(out_desc))) { output_objs.emplace_back(ade::util::value(it), - out.get(ade::util::checked_cast(ade::util::index(it)))); + out.get(ade::util::checked_cast(ade::util::index(it)))); } GConstGIEModel giem(m_g); const auto &uu = giem.metadata(this_nh).get(); const auto &op = m_gm.metadata(this_nh).get(); - // (3) Create kernel context - auto context = std::make_shared(uu, out, m_sync, op.args, op.outs, + // (2) Create kernel context + auto ctx = std::make_shared(uu, out, op.args, op.outs, std::move(input_objs), std::move(output_objs)); - - // (5) Run the kernel and start handle next frame. const auto &kk = giem.metadata(this_nh).get(); - // FIXME: Running just a single node now. - // Not sure if need to support many of them, though - // FIXME: Make this island-unmergeable? - kk.run(this_iec, context); - // (6) In not-streaming mode need to wait until the async request is over + // (4) Run the kernel. + kk.run(ctx, *m_reqPool); + + // (5) In non-streaming mode need to wait until the all tasks are done // FIXME: Is there more graceful way to handle this case ? if (!m_gm.metadata().contains()) { - m_sync.wait(); + m_reqPool->waitAndShutdown(); } } @@ -616,54 +700,16 @@ static void configureInputInfo(const IE::InputInfo::Ptr& ii, const cv::GMetaArg // NB: This is a callback used by async infer // to post outputs blobs (cv::GMat's). -struct PostOutputs { - // NB: Should be const to pass into SetCompletionCallback - void operator()() const { - for (auto i : ade::util::iota(ctx->uu.params.num_out)) { - auto& out_mat = ctx->outMatR(i); - IE::Blob::Ptr this_blob = iec.this_request.GetBlob(ctx->uu.params.output_names[i]); - copyFromIE(this_blob, out_mat); - ctx->out.post(ctx->output(i)); - } - ctx->sync.release_and_notify(); - } - - IECompiled &iec ; - std::shared_ptr ctx ; -}; - -// NB: This is a callback used by async infer -// to post output list of blobs (cv::GArray). -struct PostOutputsList { - // NB: Should be const to pass into SetCompletionCallback - void operator()() const { - for (auto i : ade::util::iota(ctx->uu.params.num_out)) { - std::vector &out_vec = ctx->outVecR(i); - - IE::Blob::Ptr out_blob = iec.this_request.GetBlob(ctx->uu.params.output_names[i]); - - cv::Mat out_mat(cached_dims[i], toCV(out_blob->getTensorDesc().getPrecision())); - // FIXME: Avoid data copy. Not sure if it is possible though - copyFromIE(out_blob, out_mat); - out_vec.push_back(std::move(out_mat)); - } - // NB: Callbacks run synchronously yet, so the lock isn't necessary - auto&& out_vec_size = ctx->outVecR(0).size(); - // NB: Now output vector is collected and can be posted to output - if (nrequests == out_vec_size) { - for (auto i : ade::util::iota(ctx->uu.params.num_out)) { - ctx->out.post(ctx->output(i)); - } - } - - ctx->sync.release_and_notify(); +static void PostOutputs(InferenceEngine::InferRequest &request, + std::shared_ptr ctx) { + for (auto i : ade::util::iota(ctx->uu.params.num_out)) + { + auto& out_mat = ctx->outMatR(i); + IE::Blob::Ptr this_blob = request.GetBlob(ctx->uu.params.output_names[i]); + copyFromIE(this_blob, out_mat); + ctx->out.post(ctx->output(i)); } - - IECompiled &iec ; - std::shared_ptr ctx ; - std::vector< std::vector > cached_dims; - size_t nrequests; -}; +} struct Infer: public cv::detail::KernelTag { using API = cv::GInferBase; @@ -715,23 +761,28 @@ struct Infer: public cv::detail::KernelTag { return result; } - static void run(IECompiled &iec, std::shared_ptr ctx) { - // non-generic version for now: - // - assumes all inputs/outputs are always Mats - for (auto i : ade::util::iota(ctx->uu.params.num_in)) { - // TODO: Ideally we shouldn't do SetBlob() but GetBlob() instead, - // and redirect our data producers to this memory - // (A memory dialog comes to the picture again) - IE::Blob::Ptr this_blob = extractBlob(*ctx, i); - iec.this_request.SetBlob(ctx->uu.params.input_names[i], this_blob); - } - - iec.this_request.SetCompletionCallback(PostOutputs{iec, ctx}); - - // NB: Since only single async request is supported, need to lock other - // attempts to get access while request is working. - ctx->sync.acquire(); - iec.this_request.StartAsync(); + static void run(std::shared_ptr ctx, + cv::gimpl::ie::RequestPool &reqPool) { + using namespace std::placeholders; + reqPool.execute( + cv::gimpl::ie::RequestPool::Task { + [ctx](InferenceEngine::InferRequest &req) { + // non-generic version for now: + // - assumes all inputs/outputs are always Mats + for (auto i : ade::util::iota(ctx->uu.params.num_in)) { + // TODO: Ideally we shouldn't do SetBlob() but GetBlob() instead, + // and redirect our data producers to this memory + // (A memory dialog comes to the picture again) + IE::Blob::Ptr this_blob = extractBlob(*ctx, i); + req.SetBlob(ctx->uu.params.input_names[i], this_blob); + } + // FIXME: Should it be done by kernel ? + // What about to do that in RequestPool ? + req.StartAsync(); + }, + std::bind(PostOutputs, _1, ctx) + } + ); } }; @@ -776,22 +827,27 @@ struct InferROI: public cv::detail::KernelTag { return result; } - static void run(IECompiled &iec, std::shared_ptr ctx) { - // non-generic version for now, per the InferROI's definition - GAPI_Assert(ctx->uu.params.num_in == 1); - const auto& this_roi = ctx->inArg(0).rref(); - - IE::Blob::Ptr this_blob = extractBlob(*ctx, 1u); - - iec.this_request.SetBlob(*(ctx->uu.params.input_names.begin()), - IE::make_shared_blob(this_blob, toIE(this_roi))); - - iec.this_request.SetCompletionCallback(PostOutputs{iec, ctx}); - - // NB: Since only single async request is supported, need to lock other - // attempts to get access while request is working. - ctx->sync.acquire(); - iec.this_request.StartAsync(); + static void run(std::shared_ptr ctx, + cv::gimpl::ie::RequestPool &reqPool) { + using namespace std::placeholders; + reqPool.execute( + cv::gimpl::ie::RequestPool::Task { + [ctx](InferenceEngine::InferRequest &req) { + GAPI_Assert(ctx->uu.params.num_in == 1); + auto&& this_roi = ctx->inArg(0).rref(); + + IE::Blob::Ptr this_blob = extractBlob(*ctx, 1); + + req.SetBlob(*(ctx->uu.params.input_names.begin()), + IE::make_shared_blob(this_blob, toIE(this_roi))); + + // FIXME: Should it be done by kernel ? + // What about to do that in RequestPool ? + req.StartAsync(); + }, + std::bind(PostOutputs, _1, ctx) + } + ); } }; @@ -834,52 +890,63 @@ struct InferList: public cv::detail::KernelTag { cv::GMetaArg{cv::empty_array_desc()}); } - static void run(IECompiled &iec, std::shared_ptr ctx) { - // non-generic version for now: - // - assumes zero input is always ROI list - // - assumes all inputs/outputs are always Mats - GAPI_Assert(ctx->uu.params.num_in == 1); // roi list is not counted in net's inputs - - const auto& in_roi_vec = ctx->inArg(0u).rref(); - - IE::Blob::Ptr this_blob = extractBlob(*ctx, 1u); - - // FIXME: This could be done ONCE at graph compile stage! - std::vector< std::vector > cached_dims(ctx->uu.params.num_out); - for (auto i : ade::util::iota(ctx->uu.params.num_out)) { - const IE::DataPtr& ie_out = ctx->uu.outputs.at(ctx->uu.params.output_names[i]); - cached_dims[i] = toCV(ie_out->getTensorDesc().getDims()); - ctx->outVecR(i).clear(); - // FIXME: Isn't this should be done automatically - // by some resetInternalData(), etc? (Probably at the GExecutor level) - } - - // NB: If list of roi is empty need to post output data anyway. - if (in_roi_vec.empty()) { - for (auto i : ade::util::iota(ctx->uu.params.num_out)) { - ctx->out.post(ctx->output(i)); - } - return; - } - - for (auto&& rc : in_roi_vec) { - // NB: Only single async request is supported now, - // so need to wait until previos iteration is over. - // However there is no need to wait async request from last iteration, - // this will be done by backend. - ctx->sync.wait(); - - IE::Blob::Ptr roi_blob = IE::make_shared_blob(this_blob, toIE(rc)); - iec.this_request.SetBlob(ctx->uu.params.input_names[0u], roi_blob); - - iec.this_request.SetCompletionCallback( - PostOutputsList{iec, ctx, cached_dims, in_roi_vec.size()}); - - // NB: Since only single async request is supported, need to lock other - // attempts to get access while request is working. - ctx->sync.acquire(); - iec.this_request.StartAsync(); - } + static void run(std::shared_ptr ctx, + cv::gimpl::ie::RequestPool &reqPool) { + + using namespace std::placeholders; + reqPool.execute( + cv::gimpl::ie::RequestPool::Task { + [ctx](InferenceEngine::InferRequest &req) { + // non-generic version for now: + // - assumes zero input is always ROI list + // - assumes all inputs/outputs are always Mats + const auto& in_roi_vec = ctx->inArg(0u).rref(); + // NB: In case there is no input data need to post output anyway + if (in_roi_vec.empty()) { + for (auto i : ade::util::iota(ctx->uu.params.num_out)) { + ctx->out.post(ctx->output(i)); + } + return; + } + + IE::Blob::Ptr this_blob = extractBlob(*ctx, 1); + + // FIXME: This could be done ONCE at graph compile stage! + std::vector> cached_dims(ctx->uu.params.num_out); + for (auto i : ade::util::iota(ctx->uu.params.num_out)) { + const IE::DataPtr& ie_out = ctx->uu.outputs.at(ctx->uu.params.output_names[i]); + cached_dims[i] = toCV(ie_out->getTensorDesc().getDims()); + // FIXME: Isn't this should be done automatically + // by some resetInternalData(), etc? (Probably at the GExecutor level) + ctx->outVecR(i).clear(); + } + + for (auto&& rc : in_roi_vec) { + IE::Blob::Ptr roi_blob = IE::make_shared_blob(this_blob, toIE(rc)); + req.SetBlob(ctx->uu.params.input_names[0u], roi_blob); + + req.Infer(); + + for (auto i : ade::util::iota(ctx->uu.params.num_out)) { + std::vector &out_vec = ctx->outVecR(i); + + IE::Blob::Ptr out_blob = req.GetBlob(ctx->uu.params.output_names[i]); + + cv::Mat out_mat(cached_dims[i], toCV(out_blob->getTensorDesc().getPrecision())); + // FIXME: Avoid data copy. Not sure if it is possible though + copyFromIE(out_blob, out_mat); + out_vec.push_back(std::move(out_mat)); + } + } + + for (auto i : ade::util::iota(ctx->uu.params.num_out)) { + ctx->out.post(ctx->output(i)); + } + }, + [](InferenceEngine::InferRequest &) { /* do nothing */ } + }, + false /* not async */ + ); } }; @@ -966,66 +1033,83 @@ struct InferList2: public cv::detail::KernelTag { cv::GMetaArg{cv::empty_array_desc()}); } - static void run(IECompiled &iec, std::shared_ptr ctx) { - GAPI_Assert(ctx->inArgs().size() > 1u - && "This operation must have at least two arguments"); - - IE::Blob::Ptr blob_0 = extractBlob(*ctx, 0u); - - // Take the next argument, which must be vector (of any kind). - // Use it only to obtain the ROI list size (sizes of all other - // vectors must be equal to this one) - const auto list_size = ctx->inArg(1u).size(); - - // FIXME: This could be done ONCE at graph compile stage! - std::vector< std::vector > cached_dims(ctx->uu.params.num_out); - for (auto i : ade::util::iota(ctx->uu.params.num_out)) { - const IE::DataPtr& ie_out = ctx->uu.outputs.at(ctx->uu.params.output_names[i]); - cached_dims[i] = toCV(ie_out->getTensorDesc().getDims()); - ctx->outVecR(i).clear(); - // FIXME: Isn't this should be done automatically - // by some resetInternalData(), etc? (Probably at the GExecutor level) - } - - // NB: If list of roi is empty need to post output data anyway. - if (list_size == 0u) { - for (auto i : ade::util::iota(ctx->uu.params.num_out)) { - ctx->out.post(ctx->output(i)); - } - return; - } - - for (const auto &list_idx : ade::util::iota(list_size)) { - // NB: Only single async request is supported now, - // so need to wait until previos iteration is over. - // However there is no need to wait async request from last iteration, - // this will be done by backend. - ctx->sync.wait(); - for (auto in_idx : ade::util::iota(ctx->uu.params.num_in)) { - const auto &this_vec = ctx->inArg(in_idx+1u); - GAPI_Assert(this_vec.size() == list_size); - IE::Blob::Ptr this_blob; - if (this_vec.getKind() == cv::detail::OpaqueKind::CV_RECT) { - const auto &vec = this_vec.rref(); - this_blob = IE::make_shared_blob(blob_0, toIE(vec[list_idx])); - } else if (this_vec.getKind() == cv::detail::OpaqueKind::CV_MAT) { - const auto &vec = this_vec.rref(); - const auto &mat = vec[list_idx]; - this_blob = wrapIE(mat, cv::gapi::ie::TraitAs::TENSOR); - } else { - GAPI_Assert(false && "Only Rect and Mat types are supported for infer list 2!"); - } - iec.this_request.SetBlob(ctx->uu.params.input_names[in_idx], this_blob); - } - - iec.this_request.SetCompletionCallback( - PostOutputsList{iec, ctx, cached_dims, list_size}); - - // NB: Since only single async request is supported, need to lock other - // attempts to get access while request is working. - ctx->sync.acquire(); - iec.this_request.StartAsync(); - } + static void run(std::shared_ptr ctx, + cv::gimpl::ie::RequestPool &reqPool) { + reqPool.execute( + cv::gimpl::ie::RequestPool::Task { + [ctx](InferenceEngine::InferRequest &req) { + GAPI_Assert(ctx->inArgs().size() > 1u + && "This operation must have at least two arguments"); + + IE::Blob::Ptr blob_0 = extractBlob(*ctx, 0); + + // Take the next argument, which must be vector (of any kind). + // Use it only to obtain the ROI list size (sizes of all other + // vectors must be equal to this one) + const auto list_size = ctx->inArg(1u).size(); + if (list_size == 0u) { + for (auto i : ade::util::iota(ctx->uu.params.num_out)) { + ctx->out.post(ctx->output(i)); + } + return; + } + + for (auto i : ade::util::iota(ctx->uu.params.num_out)) { + ctx->outVecR(i).resize(list_size); + } + + // FIXME: This could be done ONCE at graph compile stage! + std::vector< std::vector > cached_dims(ctx->uu.params.num_out); + for (auto i : ade::util::iota(ctx->uu.params.num_out)) { + const IE::DataPtr& ie_out = ctx->uu.outputs.at(ctx->uu.params.output_names[i]); + cached_dims[i] = toCV(ie_out->getTensorDesc().getDims()); + // FIXME: Isn't this should be done automatically + // by some resetInternalData(), etc? (Probably at the GExecutor level) + ctx->outVecR(i).clear(); + } + + for (const auto &list_idx : ade::util::iota(list_size)) { + for (auto in_idx : ade::util::iota(ctx->uu.params.num_in)) { + const auto &this_vec = ctx->inArg(in_idx+1u); + GAPI_Assert(this_vec.size() == list_size); + IE::Blob::Ptr this_blob; + if (this_vec.getKind() == cv::detail::OpaqueKind::CV_RECT) { + const auto &vec = this_vec.rref(); + this_blob = IE::make_shared_blob(blob_0, toIE(vec[list_idx])); + } else if (this_vec.getKind() == cv::detail::OpaqueKind::CV_MAT) { + const auto &vec = this_vec.rref(); + const auto &mat = vec[list_idx]; + this_blob = wrapIE(mat, cv::gapi::ie::TraitAs::TENSOR); + } else { + GAPI_Assert(false && + "Only Rect and Mat types are supported for infer list 2!"); + } + + req.SetBlob(ctx->uu.params.input_names[in_idx], this_blob); + } + + req.Infer(); + + for (auto i : ade::util::iota(ctx->uu.params.num_out)) { + std::vector &out_vec = ctx->outVecR(i); + + IE::Blob::Ptr out_blob = req.GetBlob(ctx->uu.params.output_names[i]); + + cv::Mat out_mat(cached_dims[i], toCV(out_blob->getTensorDesc().getPrecision())); + // FIXME: Avoid data copy. Not sure if it is possible though + copyFromIE(out_blob, out_mat); + out_vec.push_back(std::move(out_mat)); + } + } + + for (auto i : ade::util::iota(ctx->uu.params.num_out)) { + ctx->out.post(ctx->output(i)); + } + }, + [](InferenceEngine::InferRequest &) { /* do nothing */ } + }, + false /* not async */ + ); } }; diff --git a/modules/gapi/src/backends/ie/giebackend.hpp b/modules/gapi/src/backends/ie/giebackend.hpp index 87d3a99d2f..fbfeeccd61 100644 --- a/modules/gapi/src/backends/ie/giebackend.hpp +++ b/modules/gapi/src/backends/ie/giebackend.hpp @@ -24,51 +24,22 @@ #include "backends/common/gbackend.hpp" #include "compiler/gislandmodel.hpp" +#include "backends/ie/giebackend/giewrapper.hpp" // wrap::Plugin + namespace cv { namespace gimpl { namespace ie { struct IECompiled { -#if INF_ENGINE_RELEASE < 2019020000 // < 2019.R2 - InferenceEngine::InferencePlugin this_plugin; -#else - InferenceEngine::Core this_core; -#endif - InferenceEngine::ExecutableNetwork this_network; - InferenceEngine::InferRequest this_request; -}; - -// FIXME: Structure which collect all necessary sync primitives -// will be deleted when the async request pool appears -class SyncPrim { -public: - void wait() { - std::unique_lock l(m_mutex); - m_cv.wait(l, [this]{ return !m_is_busy; }); - } + std::vector createInferRequests(); - void release_and_notify() { - { - std::lock_guard lock(m_mutex); - m_is_busy = false; - } - m_cv.notify_one(); - } - - void acquire() { - std::lock_guard lock(m_mutex); - m_is_busy = true; - } - -private: - // To wait until the async request isn't over - std::condition_variable m_cv; - // To avoid spurious cond var wake up - bool m_is_busy = false; - // To sleep until condition variable wakes up - std::mutex m_mutex; + cv::gapi::ie::detail::ParamDesc params; + cv::gimpl::ie::wrap::Plugin this_plugin; + InferenceEngine::ExecutableNetwork this_network; }; +class RequestPool; + class GIEExecutable final: public GIslandExecutable { const ade::Graph &m_g; @@ -82,8 +53,8 @@ class GIEExecutable final: public GIslandExecutable // List of all resources in graph (both internal and external) std::vector m_dataNodes; - // Sync primitive - SyncPrim m_sync; + // To manage multiple async requests + std::unique_ptr m_reqPool; public: GIEExecutable(const ade::Graph &graph, diff --git a/modules/gapi/src/executor/gstreamingexecutor.cpp b/modules/gapi/src/executor/gstreamingexecutor.cpp index c1de2e6aed..3715f448d0 100644 --- a/modules/gapi/src/executor/gstreamingexecutor.cpp +++ b/modules/gapi/src/executor/gstreamingexecutor.cpp @@ -7,7 +7,6 @@ #include "precomp.hpp" #include // make_shared -#include #include @@ -583,10 +582,16 @@ class StreamingOutput final: public cv::gimpl::GIslandExecutable::IOutput std::vector< std::vector > &m_out_queues; std::shared_ptr m_island; + // NB: StreamingOutput have to be thread-safe. + // Now synchronization approach is quite poor and inefficient. + mutable std::mutex m_mutex; + // Allocate a new data object for output under idx // Prepare this object for posting virtual cv::GRunArgP get(int idx) override { + std::lock_guard lock{m_mutex}; + using MatType = cv::Mat; using SclType = cv::Scalar; @@ -663,6 +668,8 @@ class StreamingOutput final: public cv::gimpl::GIslandExecutable::IOutput } virtual void post(cv::GRunArgP&& argp) override { + std::lock_guard lock{m_mutex}; + // Mark the output ready for posting. If it is the first in the line, // actually post it and all its successors which are ready for posting too. auto it = m_postIdx.find(cv::gimpl::proto::ptr(argp)); @@ -700,6 +707,7 @@ class StreamingOutput final: public cv::gimpl::GIslandExecutable::IOutput } virtual void post(cv::gimpl::EndOfStream&&) override { + std::lock_guard lock{m_mutex}; // If the posting list is empty, just broadcast the stop message. // If it is not, enqueue the Stop message in the postings list. for (auto &&it : ade::util::indexed(m_postings)) @@ -725,6 +733,7 @@ class StreamingOutput final: public cv::gimpl::GIslandExecutable::IOutput } void meta(const cv::GRunArgP &out, const cv::GRunArg::Meta &m) override { + std::lock_guard lock{m_mutex}; const auto it = m_postIdx.find(cv::gimpl::proto::ptr(out)); GAPI_Assert(it != m_postIdx.end()); @@ -747,6 +756,7 @@ public: bool done() const { + std::lock_guard lock{m_mutex}; // The streaming actor work is considered DONE for this stream // when it posted/resent all STOP messages to all its outputs. return m_stops_sent == desc().size(); diff --git a/modules/gapi/test/infer/gapi_infer_ie_test.cpp b/modules/gapi/test/infer/gapi_infer_ie_test.cpp index 93571e5fd1..a996430061 100644 --- a/modules/gapi/test/infer/gapi_infer_ie_test.cpp +++ b/modules/gapi/test/infer/gapi_infer_ie_test.cpp @@ -953,6 +953,16 @@ TEST_F(ROIListNV12, Infer2MediaInputNV12) validate(); } +TEST(Infer, SetInvalidNumberOfRequests) +{ + using AGInfo = std::tuple; + G_API_NET(AgeGender, , "test-age-gender"); + + cv::gapi::ie::Params pp{"model", "weights", "device"}; + + EXPECT_ANY_THROW(pp.cfgNumRequests(0u)); +} + TEST(Infer, TestStreamingInfer) { initTestDataPath(); @@ -980,7 +990,8 @@ TEST(Infer, TestStreamingInfer) auto pp = cv::gapi::ie::Params { params.model_path, params.weights_path, params.device_id - }.cfgOutputLayers({ "age_conv3", "prob" }); + }.cfgOutputLayers({ "age_conv3", "prob" }) + .cfgNumRequests(4u); std::size_t num_frames = 0u; @@ -1049,7 +1060,8 @@ TEST(InferROI, TestStreamingInfer) auto pp = cv::gapi::ie::Params { params.model_path, params.weights_path, params.device_id - }.cfgOutputLayers({ "age_conv3", "prob" }); + }.cfgOutputLayers({ "age_conv3", "prob" }) + .cfgNumRequests(4u); std::size_t num_frames = 0u; @@ -1131,7 +1143,8 @@ TEST(InferList, TestStreamingInfer) auto pp = cv::gapi::ie::Params { params.model_path, params.weights_path, params.device_id - }.cfgOutputLayers({ "age_conv3", "prob" }); + }.cfgOutputLayers({ "age_conv3", "prob" }) + .cfgNumRequests(4u); std::size_t num_frames = 0u; @@ -1222,8 +1235,8 @@ TEST(Infer2, TestStreamingInfer) auto pp = cv::gapi::ie::Params { params.model_path, params.weights_path, params.device_id - }.cfgOutputLayers({ "age_conv3", "prob" }); - + }.cfgOutputLayers({ "age_conv3", "prob" }) + .cfgNumRequests(4u); std::size_t num_frames = 0u; std::size_t max_frames = 10u; @@ -1311,8 +1324,8 @@ TEST(InferEmptyList, TestStreamingInfer) auto pp = cv::gapi::ie::Params { params.model_path, params.weights_path, params.device_id - }.cfgOutputLayers({ "age_conv3", "prob" }); - + }.cfgOutputLayers({ "age_conv3", "prob" }) + .cfgNumRequests(4u); std::size_t num_frames = 0u; std::size_t max_frames = 1u; @@ -1366,8 +1379,8 @@ TEST(Infer2EmptyList, TestStreamingInfer) auto pp = cv::gapi::ie::Params { params.model_path, params.weights_path, params.device_id - }.cfgOutputLayers({ "age_conv3", "prob" }); - + }.cfgOutputLayers({ "age_conv3", "prob" }) + .cfgNumRequests(4u); std::size_t num_frames = 0u; std::size_t max_frames = 1u;