Merge pull request #18673 from dmatveev:dm/upstream_desync
commit
691c3d1e3c
40 changed files with 2827 additions and 195 deletions
@ -0,0 +1,84 @@ |
||||
// This file is part of OpenCV project.
|
||||
// It is subject to the license terms in the LICENSE file found in the top-level directory
|
||||
// of this distribution and at http://opencv.org/license.html.
|
||||
//
|
||||
// Copyright (C) 2020 Intel Corporation
|
||||
|
||||
|
||||
#ifndef OPENCV_GAPI_GSTREAMING_DESYNC_HPP |
||||
#define OPENCV_GAPI_GSTREAMING_DESYNC_HPP |
||||
|
||||
#include <tuple> |
||||
|
||||
#include <opencv2/gapi/util/util.hpp> |
||||
#include <opencv2/gapi/gtype_traits.hpp> |
||||
#include <opencv2/gapi/garg.hpp> |
||||
#include <opencv2/gapi/gcall.hpp> |
||||
#include <opencv2/gapi/gkernel.hpp> |
||||
|
||||
namespace cv { |
||||
namespace gapi { |
||||
namespace streaming { |
||||
|
||||
namespace detail { |
||||
struct GDesync { |
||||
static const char *id() { |
||||
return "org.opencv.streaming.desync"; |
||||
} |
||||
|
||||
// An universal yield for desync.
|
||||
// Yields output objects according to the input Types...
|
||||
// Reuses gkernel machinery.
|
||||
// FIXME: This function can be generic and declared in gkernel.hpp
|
||||
// (it is there already, but a part of GKernelType[M]
|
||||
template<typename... R, int... IIs> |
||||
static std::tuple<R...> yield(cv::GCall &call, cv::detail::Seq<IIs...>) { |
||||
return std::make_tuple(cv::detail::Yield<R>::yield(call, IIs)...); |
||||
} |
||||
}; |
||||
|
||||
template<typename G> |
||||
G desync(const G &g) { |
||||
cv::GKernel k{ |
||||
GDesync::id() // kernel id
|
||||
, "" // kernel tag
|
||||
, [](const GMetaArgs &a, const GArgs &) {return a;} // outMeta callback
|
||||
, {cv::detail::GTypeTraits<G>::shape} // output Shape
|
||||
, {cv::detail::GTypeTraits<G>::op_kind} // input data kinds
|
||||
, {cv::detail::GObtainCtor<G>::get()} // output template ctors
|
||||
}; |
||||
cv::GCall call(std::move(k)); |
||||
call.pass(g); |
||||
return std::get<0>(GDesync::yield<G>(call, cv::detail::MkSeq<1>::type())); |
||||
} |
||||
} // namespace detail
|
||||
|
||||
/**
|
||||
* @brief Starts a desynchronized branch in the graph. |
||||
* |
||||
* This operation takes a single G-API data object and returns a |
||||
* graph-level "duplicate" of this object. |
||||
* |
||||
* Operations which use this data object can be desynchronized |
||||
* from the rest of the graph. |
||||
* |
||||
* This operation has no effect when a GComputation is compiled with |
||||
* regular cv::GComputation::compile(), since cv::GCompiled objects |
||||
* always produce their full output vectors. |
||||
* |
||||
* This operation only makes sense when a GComputation is compiled in |
||||
* straming mode with cv::GComputation::compileStreaming(). If this |
||||
* operation is used and there are desynchronized outputs, the user |
||||
* should use a special version of cv::GStreamingCompiled::pull() |
||||
* which produces an array of cv::util::optional<> objects. |
||||
* |
||||
* @note This feature is highly experimental now and is currently |
||||
* limited to a single GMat argument only. |
||||
*/ |
||||
GAPI_EXPORTS GMat desync(const GMat &g); |
||||
|
||||
} // namespace streaming
|
||||
} // namespace gapi
|
||||
} // namespace cv
|
||||
|
||||
#endif // OPENCV_GAPI_GSTREAMING_DESYNC_HPP
|
@ -0,0 +1,264 @@ |
||||
#include <algorithm> |
||||
#include <iostream> |
||||
#include <sstream> |
||||
|
||||
#include <opencv2/imgproc.hpp> |
||||
#include <opencv2/imgcodecs.hpp> |
||||
#include <opencv2/gapi.hpp> |
||||
#include <opencv2/gapi/core.hpp> |
||||
#include <opencv2/gapi/imgproc.hpp> |
||||
#include <opencv2/gapi/infer.hpp> |
||||
#include <opencv2/gapi/render.hpp> |
||||
#include <opencv2/gapi/infer/ie.hpp> |
||||
#include <opencv2/gapi/cpu/gcpukernel.hpp> |
||||
#include <opencv2/gapi/streaming/cap.hpp> |
||||
#include <opencv2/highgui.hpp> |
||||
|
||||
const std::string keys = |
||||
"{ h help | | Print this help message }" |
||||
"{ input | | Path to the input video file }" |
||||
"{ facem | face-detection-adas-0001.xml | Path to OpenVINO IE face detection model (.xml) }" |
||||
"{ faced | CPU | Target device for face detection model (e.g. CPU, GPU, VPU, ...) }" |
||||
"{ r roi | -1,-1,-1,-1 | Region of interest (ROI) to use for inference. Identified automatically when not set }"; |
||||
|
||||
namespace { |
||||
|
||||
std::string weights_path(const std::string &model_path) { |
||||
const auto EXT_LEN = 4u; |
||||
const auto sz = model_path.size(); |
||||
CV_Assert(sz > EXT_LEN); |
||||
|
||||
auto ext = model_path.substr(sz - EXT_LEN); |
||||
std::transform(ext.begin(), ext.end(), ext.begin(), [](unsigned char c){ |
||||
return static_cast<unsigned char>(std::tolower(c)); |
||||
}); |
||||
CV_Assert(ext == ".xml"); |
||||
return model_path.substr(0u, sz - EXT_LEN) + ".bin"; |
||||
} |
||||
|
||||
cv::util::optional<cv::Rect> parse_roi(const std::string &rc) { |
||||
cv::Rect rv; |
||||
char delim[3]; |
||||
|
||||
std::stringstream is(rc); |
||||
is >> rv.x >> delim[0] >> rv.y >> delim[1] >> rv.width >> delim[2] >> rv.height; |
||||
if (is.bad()) { |
||||
return cv::util::optional<cv::Rect>(); // empty value
|
||||
} |
||||
const auto is_delim = [](char c) { |
||||
return c == ','; |
||||
}; |
||||
if (!std::all_of(std::begin(delim), std::end(delim), is_delim)) { |
||||
return cv::util::optional<cv::Rect>(); // empty value
|
||||
|
||||
} |
||||
if (rv.x < 0 || rv.y < 0 || rv.width <= 0 || rv.height <= 0) { |
||||
return cv::util::optional<cv::Rect>(); // empty value
|
||||
} |
||||
return cv::util::make_optional(std::move(rv)); |
||||
} |
||||
|
||||
} // namespace
|
||||
|
||||
namespace custom { |
||||
|
||||
G_API_NET(FaceDetector, <cv::GMat(cv::GMat)>, "face-detector"); |
||||
|
||||
using GDetections = cv::GArray<cv::Rect>; |
||||
using GRect = cv::GOpaque<cv::Rect>; |
||||
using GSize = cv::GOpaque<cv::Size>; |
||||
using GPrims = cv::GArray<cv::gapi::wip::draw::Prim>; |
||||
|
||||
G_API_OP(GetSize, <GSize(cv::GMat)>, "sample.custom.get-size") { |
||||
static cv::GOpaqueDesc outMeta(const cv::GMatDesc &) { |
||||
return cv::empty_gopaque_desc(); |
||||
} |
||||
}; |
||||
|
||||
G_API_OP(LocateROI, <GRect(cv::GMat)>, "sample.custom.locate-roi") { |
||||
static cv::GOpaqueDesc outMeta(const cv::GMatDesc &) { |
||||
return cv::empty_gopaque_desc(); |
||||
} |
||||
}; |
||||
|
||||
G_API_OP(ParseSSD, <GDetections(cv::GMat, GRect, GSize)>, "sample.custom.parse-ssd") { |
||||
static cv::GArrayDesc outMeta(const cv::GMatDesc &, const cv::GOpaqueDesc &, const cv::GOpaqueDesc &) { |
||||
return cv::empty_array_desc(); |
||||
} |
||||
}; |
||||
|
||||
G_API_OP(BBoxes, <GPrims(GDetections, GRect)>, "sample.custom.b-boxes") { |
||||
static cv::GArrayDesc outMeta(const cv::GArrayDesc &, const cv::GOpaqueDesc &) { |
||||
return cv::empty_array_desc(); |
||||
} |
||||
}; |
||||
|
||||
GAPI_OCV_KERNEL(OCVGetSize, GetSize) { |
||||
static void run(const cv::Mat &in, cv::Size &out) { |
||||
out = {in.cols, in.rows}; |
||||
} |
||||
}; |
||||
|
||||
GAPI_OCV_KERNEL(OCVLocateROI, LocateROI) { |
||||
// This is the place where we can run extra analytics
|
||||
// on the input image frame and select the ROI (region
|
||||
// of interest) where we want to detect our objects (or
|
||||
// run any other inference).
|
||||
//
|
||||
// Currently it doesn't do anything intelligent,
|
||||
// but only crops the input image to square (this is
|
||||
// the most convenient aspect ratio for detectors to use)
|
||||
|
||||
static void run(const cv::Mat &in_mat, cv::Rect &out_rect) { |
||||
|
||||
// Identify the central point & square size (- some padding)
|
||||
const auto center = cv::Point{in_mat.cols/2, in_mat.rows/2}; |
||||
auto sqside = std::min(in_mat.cols, in_mat.rows); |
||||
|
||||
// Now build the central square ROI
|
||||
out_rect = cv::Rect{ center.x - sqside/2 |
||||
, center.y - sqside/2 |
||||
, sqside |
||||
, sqside |
||||
}; |
||||
} |
||||
}; |
||||
|
||||
GAPI_OCV_KERNEL(OCVParseSSD, ParseSSD) { |
||||
static void run(const cv::Mat &in_ssd_result, |
||||
const cv::Rect &in_roi, |
||||
const cv::Size &in_parent_size, |
||||
std::vector<cv::Rect> &out_objects) { |
||||
const auto &in_ssd_dims = in_ssd_result.size; |
||||
CV_Assert(in_ssd_dims.dims() == 4u); |
||||
|
||||
const int MAX_PROPOSALS = in_ssd_dims[2]; |
||||
const int OBJECT_SIZE = in_ssd_dims[3]; |
||||
CV_Assert(OBJECT_SIZE == 7); // fixed SSD object size
|
||||
|
||||
const cv::Size up_roi = in_roi.size(); |
||||
const cv::Rect surface({0,0}, in_parent_size); |
||||
|
||||
out_objects.clear(); |
||||
|
||||
const float *data = in_ssd_result.ptr<float>(); |
||||
for (int i = 0; i < MAX_PROPOSALS; i++) { |
||||
const float image_id = data[i * OBJECT_SIZE + 0]; |
||||
const float label = data[i * OBJECT_SIZE + 1]; |
||||
const float confidence = data[i * OBJECT_SIZE + 2]; |
||||
const float rc_left = data[i * OBJECT_SIZE + 3]; |
||||
const float rc_top = data[i * OBJECT_SIZE + 4]; |
||||
const float rc_right = data[i * OBJECT_SIZE + 5]; |
||||
const float rc_bottom = data[i * OBJECT_SIZE + 6]; |
||||
(void) label; // unused
|
||||
|
||||
if (image_id < 0.f) { |
||||
break; // marks end-of-detections
|
||||
} |
||||
if (confidence < 0.5f) { |
||||
continue; // skip objects with low confidence
|
||||
} |
||||
|
||||
// map relative coordinates to the original image scale
|
||||
// taking the ROI into account
|
||||
cv::Rect rc; |
||||
rc.x = static_cast<int>(rc_left * up_roi.width); |
||||
rc.y = static_cast<int>(rc_top * up_roi.height); |
||||
rc.width = static_cast<int>(rc_right * up_roi.width) - rc.x; |
||||
rc.height = static_cast<int>(rc_bottom * up_roi.height) - rc.y; |
||||
rc.x += in_roi.x; |
||||
rc.y += in_roi.y; |
||||
out_objects.emplace_back(rc & surface); |
||||
} |
||||
} |
||||
}; |
||||
|
||||
GAPI_OCV_KERNEL(OCVBBoxes, BBoxes) { |
||||
// This kernel converts the rectangles into G-API's
|
||||
// rendering primitives
|
||||
static void run(const std::vector<cv::Rect> &in_face_rcs, |
||||
const cv::Rect &in_roi, |
||||
std::vector<cv::gapi::wip::draw::Prim> &out_prims) { |
||||
out_prims.clear(); |
||||
const auto cvt = [](const cv::Rect &rc, const cv::Scalar &clr) { |
||||
return cv::gapi::wip::draw::Rect(rc, clr, 2); |
||||
}; |
||||
out_prims.emplace_back(cvt(in_roi, CV_RGB(0,255,255))); // cyan
|
||||
for (auto &&rc : in_face_rcs) { |
||||
out_prims.emplace_back(cvt(rc, CV_RGB(0,255,0))); // green
|
||||
} |
||||
} |
||||
}; |
||||
|
||||
} // namespace custom
|
||||
|
||||
int main(int argc, char *argv[]) |
||||
{ |
||||
cv::CommandLineParser cmd(argc, argv, keys); |
||||
if (cmd.has("help")) { |
||||
cmd.printMessage(); |
||||
return 0; |
||||
} |
||||
|
||||
// Prepare parameters first
|
||||
const std::string input = cmd.get<std::string>("input"); |
||||
const auto opt_roi = parse_roi(cmd.get<std::string>("roi")); |
||||
|
||||
const auto face_model_path = cmd.get<std::string>("facem"); |
||||
auto face_net = cv::gapi::ie::Params<custom::FaceDetector> { |
||||
face_model_path, // path to topology IR
|
||||
weights_path(face_model_path), // path to weights
|
||||
cmd.get<std::string>("faced"), // device specifier
|
||||
}; |
||||
auto kernels = cv::gapi::kernels |
||||
< custom::OCVGetSize |
||||
, custom::OCVLocateROI |
||||
, custom::OCVParseSSD |
||||
, custom::OCVBBoxes>(); |
||||
auto networks = cv::gapi::networks(face_net); |
||||
|
||||
// Now build the graph. The graph structure may vary
|
||||
// pased on the input parameters
|
||||
cv::GStreamingCompiled pipeline; |
||||
auto inputs = cv::gin(cv::gapi::wip::make_src<cv::gapi::wip::GCaptureSource>(input)); |
||||
|
||||
if (opt_roi.has_value()) { |
||||
// Use the value provided by user
|
||||
std::cout << "Will run inference for static region " |
||||
<< opt_roi.value() |
||||
<< " only" |
||||
<< std::endl; |
||||
cv::GMat in; |
||||
cv::GOpaque<cv::Rect> in_roi; |
||||
auto blob = cv::gapi::infer<custom::FaceDetector>(in_roi, in); |
||||
auto rcs = custom::ParseSSD::on(blob, in_roi, custom::GetSize::on(in)); |
||||
auto out = cv::gapi::wip::draw::render3ch(in, custom::BBoxes::on(rcs, in_roi)); |
||||
pipeline = cv::GComputation(cv::GIn(in, in_roi), cv::GOut(out)) |
||||
.compileStreaming(cv::compile_args(kernels, networks)); |
||||
|
||||
// Since the ROI to detect is manual, make it part of the input vector
|
||||
inputs.push_back(cv::gin(opt_roi.value())[0]); |
||||
} else { |
||||
// Automatically detect ROI to infer. Make it output parameter
|
||||
std::cout << "ROI is not set or invalid. Locating it automatically" |
||||
<< std::endl; |
||||
cv::GMat in; |
||||
cv::GOpaque<cv::Rect> roi = custom::LocateROI::on(in); |
||||
auto blob = cv::gapi::infer<custom::FaceDetector>(roi, in); |
||||
auto rcs = custom::ParseSSD::on(blob, roi, custom::GetSize::on(in)); |
||||
auto out = cv::gapi::wip::draw::render3ch(in, custom::BBoxes::on(rcs, roi)); |
||||
pipeline = cv::GComputation(cv::GIn(in), cv::GOut(out)) |
||||
.compileStreaming(cv::compile_args(kernels, networks)); |
||||
} |
||||
|
||||
// The execution part
|
||||
pipeline.setSource(std::move(inputs)); |
||||
pipeline.start(); |
||||
|
||||
cv::Mat out; |
||||
while (pipeline.pull(cv::gout(out))) { |
||||
cv::imshow("Out", out); |
||||
cv::waitKey(1); |
||||
} |
||||
return 0; |
||||
} |
@ -0,0 +1,74 @@ |
||||
// This file is part of OpenCV project.
|
||||
// It is subject to the license terms in the LICENSE file found in the top-level directory
|
||||
// of this distribution and at http://opencv.org/license.html.
|
||||
//
|
||||
// Copyright (C) 2020 Intel Corporation
|
||||
|
||||
#include "precomp.hpp" |
||||
|
||||
#include <opencv2/gapi/streaming/desync.hpp> |
||||
#include <opencv2/gapi/core.hpp> |
||||
|
||||
cv::GMat cv::gapi::streaming::desync(const cv::GMat &g) { |
||||
// FIXME: this is a limited implementation of desync
|
||||
// The real implementation must be generic (template) and
|
||||
// reside in desync.hpp (and it is detail::desync<>())
|
||||
|
||||
// FIXME: Put a copy here to solve the below problem
|
||||
// FIXME: Because of the copy, the desync functionality is limited
|
||||
// to GMat only (we don't have generic copy kernel for other
|
||||
// object types)
|
||||
return cv::gapi::copy(detail::desync(g)); |
||||
|
||||
// FIXME
|
||||
//
|
||||
// If consumed by multiple different islands (OCV and Fluid by
|
||||
// example, an object needs to be desynchronized individually
|
||||
// for every path.
|
||||
//
|
||||
// This is a limitation of the current implementation. It works
|
||||
// this way: every "desync" link from the main path to a new
|
||||
// desync path gets its "DesyncQueue" object which stores only the
|
||||
// last value written before of the desync object (DO) it consumes
|
||||
// (the container of type "last written value" or LWV.
|
||||
//
|
||||
// LWV
|
||||
// [Sync path] -> desync() - - > DO -> [ISL0 @ Desync path #1]
|
||||
//
|
||||
// At the same time, generally, every island in the streaming
|
||||
// graph gets its individual input as a queue (so normally, a
|
||||
// writer pushes the same output MULTIPLE TIMES if it has mutliple
|
||||
// readers):
|
||||
//
|
||||
// LWV
|
||||
// [Sync path] -> desync() - - > DO1 -> [ISL0 @ Desync path #1]
|
||||
// : LWV
|
||||
// ' - - > DO2 -> [ISL1 @ Desync path #1]
|
||||
//
|
||||
// For users, it may seem legit to use desync here only once, and
|
||||
// it MUST BE legit once the problem is fixed.
|
||||
// But the problem with the current implementation is that islands
|
||||
// on the same desync path get different desync queues and in fact
|
||||
// stay desynchronized between each other. One shouldn't consider
|
||||
// this as a single desync path anymore.
|
||||
// If these two ISLs are then merged e.g. with add(a,b), the
|
||||
// results will be inconsistent, given that the latency of ISL0
|
||||
// and ISL1 may be different. This is not the same frame anymore
|
||||
// coming as `a` and `b` to add(a,b) because of it.
|
||||
//
|
||||
// To make things clear, we forbid this now and ask to call
|
||||
// desync one more time to allow that. It is bad since the graph
|
||||
// structure and island layout depends on kernel packages used,
|
||||
// not on the sole GComputation structure. This needs to be fixed!
|
||||
// Here's the working configuration:
|
||||
//
|
||||
// LWV
|
||||
// [Sync path] -> desync() - - > DO1 -> [ISL0 @ Desync path #1]
|
||||
// : LWV
|
||||
// '-> desync() - - > DO2 -> [ISL1 @ Desync path #2] <-(!)
|
||||
//
|
||||
// Put an operation right after desync() is a quick workaround to
|
||||
// this synchronization problem. There will be one "last_written_value"
|
||||
// connected to a desynchronized data object, and this sole last_written_value
|
||||
// object will feed both branches of the streaming executable.
|
||||
} |
@ -0,0 +1,305 @@ |
||||
// This file is part of OpenCV project.
|
||||
// It is subject to the license terms in the LICENSE file found in the top-level directory
|
||||
// of this distribution and at http://opencv.org/license.html.
|
||||
//
|
||||
// Copyright (C) 2020 Intel Corporation
|
||||
|
||||
|
||||
#include "precomp.hpp" |
||||
|
||||
#include <ade/util/algorithm.hpp> |
||||
#include <ade/util/zip_range.hpp> |
||||
#include <opencv2/gapi/streaming/desync.hpp>// GDesync intrinsic |
||||
|
||||
#include "compiler/gmodel.hpp" |
||||
#include "compiler/passes/passes.hpp" |
||||
|
||||
namespace desync { |
||||
namespace { |
||||
|
||||
// Drop the desynchronized node `nh` from the graph, reconnect the
|
||||
// graph structure properly. This is a helper function which is used
|
||||
// in both drop(g) and apply(g) passes.
|
||||
//
|
||||
// @return a vector of new edge handles connecting the "main" graph
|
||||
// with its desynchronized part.
|
||||
std::vector<ade::EdgeHandle> drop(cv::gimpl::GModel::Graph &g, |
||||
ade::NodeHandle nh) { |
||||
using namespace cv::gimpl; |
||||
|
||||
// What we need to do here:
|
||||
// 1. Connect the readers of its produced data objects
|
||||
// to the input data objects of desync;
|
||||
// 2. Drop the data object it produces.
|
||||
// 3. Drop the desync operation itself;
|
||||
std::vector<ade::NodeHandle> in_data_objs = GModel::orderedInputs(g, nh); |
||||
std::vector<ade::NodeHandle> out_data_objs = GModel::orderedOutputs(g, nh); |
||||
std::vector<ade::EdgeHandle> new_links; |
||||
GAPI_Assert(in_data_objs.size() == out_data_objs.size()); |
||||
GAPI_DbgAssert(ade::util::all_of |
||||
(out_data_objs, |
||||
[&](const ade::NodeHandle &oh) { |
||||
return g.metadata(oh).contains<Data>(); |
||||
})); |
||||
// (1)
|
||||
for (auto &&it: ade::util::zip(ade::util::toRange(in_data_objs), |
||||
ade::util::toRange(out_data_objs))) { |
||||
auto these_new_links = GModel::redirectReaders(g, |
||||
std::get<1>(it), |
||||
std::get<0>(it)); |
||||
new_links.insert(new_links.end(), |
||||
these_new_links.begin(), |
||||
these_new_links.end()); |
||||
} |
||||
// (2)
|
||||
for (auto &&old_out_nh : out_data_objs) { |
||||
g.erase(old_out_nh); |
||||
} |
||||
// (3)
|
||||
g.erase(nh); |
||||
|
||||
return new_links; |
||||
} |
||||
|
||||
// Tracing a desynchronizing subgraph is somewhat tricky and happens
|
||||
// in both directions: downwards and upwards.
|
||||
//
|
||||
// The downward process is the basic one: we start with a "desync"
|
||||
// OP node and go down to the graph using the "output" edges. We check
|
||||
// if all nodes on this path [can] belong to this desynchronized path
|
||||
// and don't overlap with others.
|
||||
//
|
||||
// An important contract to maintain is that the desynchronized part
|
||||
// can't have any input references from the "main" graph part or any
|
||||
// other desynchronized part in the graph. This contract is validated
|
||||
// by checking every node's input which must belong to the same
|
||||
// desynchronized part.
|
||||
//
|
||||
// Here is the pitfall of this check:
|
||||
//
|
||||
// v
|
||||
// GMat_0
|
||||
// v
|
||||
// +----------+
|
||||
// | desync() | <- This point originates the traceDown process
|
||||
// +----------+
|
||||
// v
|
||||
// GMat_0' <- This node will be tagged for this desync at
|
||||
// :--------. step 0/1
|
||||
// v : <- The order how output nodes are visited is not
|
||||
// +----------+ : specified, we can visit Op2() first (as there
|
||||
// | Op1() | : is a direct link) bypassing visiting and tagging
|
||||
// +----------+ : Op1() and GMat_1
|
||||
// v :
|
||||
// GMat_1 :
|
||||
// : .---'
|
||||
// v v <- When we visit Op2() via the 2nd edge on this
|
||||
// +----------+ graph, we check if all inputs belong to the same
|
||||
// | Op2() | desynchronized graph and GMat_1 fails this check
|
||||
// +----------+ (since the traceDown() process haven't visited
|
||||
// it yet).
|
||||
//
|
||||
// Cases like this originate the traceUp() process: if we find an
|
||||
// input node in our desynchronized path which doesn't belong to this
|
||||
// path YET, it is not 100% a problem, and we need to trace it back
|
||||
// (upwards) to see if it is really a case.
|
||||
|
||||
// This recursive function checks the desync_id in the graph upwards.
|
||||
// The process doesn't continue for nodes which have a valid
|
||||
// desync_id already.
|
||||
// The process only continues for nodes which have no desync_id
|
||||
// assigned. If there's no such nodes anymore, the procedure is
|
||||
// considered complete and a list of nodes to tag is returned to the
|
||||
// caller.
|
||||
//
|
||||
// If NO inputs of this node have a valid desync_id, the desync
|
||||
// invariant is broken and the function throws.
|
||||
void traceUp(cv::gimpl::GModel::Graph &g, |
||||
const ade::NodeHandle &nh, |
||||
int desync_id, |
||||
std::vector<ade::NodeHandle> &path) { |
||||
using namespace cv::gimpl; |
||||
|
||||
GAPI_Assert(!nh->inNodes().empty() |
||||
&& "traceUp: a desynchronized part of the graph is not isolated?"); |
||||
|
||||
if (g.metadata(nh).contains<DesyncPath>()) { |
||||
// We may face nodes which have DesyncPath already visited during
|
||||
// this recursive process (e.g. via some other output or branch in the
|
||||
// subgraph)
|
||||
if (g.metadata(nh).get<DesyncPath>().index != desync_id) { |
||||
GAPI_Assert(false && "Desynchronization can't be nested!"); |
||||
} |
||||
return; // This object belongs to the desync path - exit early.
|
||||
} |
||||
|
||||
// Regardless of the result, put this nh to the path
|
||||
path.push_back(nh); |
||||
|
||||
// Check if the input nodes are OK
|
||||
std::vector<ade::NodeHandle> nodes_to_trace; |
||||
nodes_to_trace.reserve(nh->inNodes().size()); |
||||
for (auto &&in_nh : nh->inNodes()) { |
||||
if (g.metadata(in_nh).contains<DesyncPath>()) { |
||||
// We may face nodes which have DesyncPath already visited during
|
||||
// this recursive process (e.g. via some other output or branch in the
|
||||
// subgraph)
|
||||
GAPI_Assert(g.metadata(in_nh).get<DesyncPath>().index == desync_id |
||||
&& "Desynchronization can't be nested!"); |
||||
} else { |
||||
nodes_to_trace.push_back(in_nh); |
||||
} |
||||
} |
||||
|
||||
// If there are nodes to trace, continue the recursion
|
||||
for (auto &&up_nh : nodes_to_trace) { |
||||
traceUp(g, up_nh, desync_id, path); |
||||
} |
||||
} |
||||
|
||||
// This recursive function propagates the desync_id down to the graph
|
||||
// starting at nh, and also checks:
|
||||
// - if this desync path is isolated;
|
||||
// - if this desync path is not overlapped.
|
||||
// It also originates the traceUp() process at the points of
|
||||
// uncertainty (as described in the comment above).
|
||||
void traceDown(cv::gimpl::GModel::Graph &g, |
||||
const ade::NodeHandle &nh, |
||||
int desync_id) { |
||||
using namespace cv::gimpl; |
||||
|
||||
if (g.metadata(nh).contains<DesyncPath>()) { |
||||
// We may face nodes which have DesyncPath already visited during
|
||||
// this recursive process (e.g. via some other output or branch in the
|
||||
// subgraph)
|
||||
GAPI_Assert(g.metadata(nh).get<DesyncPath>().index == desync_id |
||||
&& "Desynchronization can't be nested!"); |
||||
} else { |
||||
g.metadata(nh).set(DesyncPath{desync_id}); |
||||
} |
||||
|
||||
// All inputs of this data object must belong to the same
|
||||
// desync path.
|
||||
for (auto &&in_nh : nh->inNodes()) { |
||||
// If an input object is not assigned to this desync path,
|
||||
// it does not means that the object doesn't belong to
|
||||
// this path. Check it.
|
||||
std::vector<ade::NodeHandle> path_up; |
||||
traceUp(g, in_nh, desync_id, path_up); |
||||
// We get here on success. Just set the proper tags for
|
||||
// the identified input path.
|
||||
for (auto &&up_nh : path_up) { |
||||
g.metadata(up_nh).set(DesyncPath{desync_id}); |
||||
} |
||||
} |
||||
|
||||
// Propagate the tag & check down
|
||||
for (auto &&out_nh : nh->outNodes()) { |
||||
traceDown(g, out_nh, desync_id); |
||||
} |
||||
} |
||||
|
||||
// Streaming case: ensure the graph has proper isolation of the
|
||||
// desynchronized parts, set proper Edge metadata hints for
|
||||
// GStreamingExecutable
|
||||
void apply(cv::gimpl::GModel::Graph &g) { |
||||
using namespace cv::gimpl; |
||||
|
||||
// Stage 0. Trace down the desync operations in the graph.
|
||||
// Tag them with their unique (per graph) identifiers.
|
||||
int total_desync = 0; |
||||
for (auto &&nh : g.nodes()) { |
||||
if (g.metadata(nh).get<NodeType>().t == NodeType::OP) { |
||||
const auto &op = g.metadata(nh).get<Op>(); |
||||
if (op.k.name == cv::gapi::streaming::detail::GDesync::id()) { |
||||
GAPI_Assert(!g.metadata(nh).contains<DesyncPath>() |
||||
&& "Desynchronization can't be nested!"); |
||||
const int this_desync_id = total_desync++; |
||||
g.metadata(nh).set(DesyncPath{this_desync_id}); |
||||
for (auto &&out_nh: nh->outNodes()) { |
||||
traceDown(g, out_nh, this_desync_id); |
||||
} |
||||
} // if (desync)
|
||||
} // if(OP)
|
||||
} // for(nodes)
|
||||
|
||||
// Tracing is done for all desync ops in the graph now.
|
||||
// Stage 1. Drop the desync operations from the graph, but mark
|
||||
// the desynchronized edges a special way.
|
||||
// The desynchronized edge is the edge which connects a main
|
||||
// subgraph data with a desynchronized subgraph data.
|
||||
std::vector<ade::NodeHandle> nodes(g.nodes().begin(), g.nodes().end()); |
||||
for (auto &&nh : nodes) { |
||||
if (nh == nullptr) { |
||||
// Some nodes could be dropped already during the procedure
|
||||
// thanks ADE their NodeHandles updated automatically
|
||||
continue; |
||||
} |
||||
if (g.metadata(nh).get<NodeType>().t == NodeType::OP) { |
||||
const auto &op = g.metadata(nh).get<Op>(); |
||||
if (op.k.name == cv::gapi::streaming::detail::GDesync::id()) { |
||||
auto index = g.metadata(nh).get<DesyncPath>().index; |
||||
auto new_links = drop(g, nh); |
||||
for (auto &&eh : new_links) { |
||||
g.metadata(eh).set(DesyncEdge{index}); |
||||
} |
||||
} // if (desync)
|
||||
} // if (Op)
|
||||
} // for(nodes)
|
||||
|
||||
// Stage 2. Put a synchronized tag if there were changes applied
|
||||
if (total_desync > 0) { |
||||
g.metadata().set(Desynchronized{}); |
||||
} |
||||
} |
||||
|
||||
// Probably the simplest case: desync makes no sense in the regular
|
||||
// compilation process, so just drop all its occurences in the graph,
|
||||
// reconnecting nodes properly.
|
||||
void drop(cv::gimpl::GModel::Graph &g) { |
||||
// FIXME: LOG here that we're dropping the desync operations as
|
||||
// they have no sense when compiling in the regular mode.
|
||||
using namespace cv::gimpl; |
||||
std::vector<ade::NodeHandle> nodes(g.nodes().begin(), g.nodes().end()); |
||||
for (auto &&nh : nodes) { |
||||
if (nh == nullptr) { |
||||
// Some nodes could be dropped already during the procedure
|
||||
// thanks ADE their NodeHandles updated automatically
|
||||
continue; |
||||
} |
||||
if (g.metadata(nh).get<NodeType>().t == NodeType::OP) { |
||||
const auto &op = g.metadata(nh).get<Op>(); |
||||
if (op.k.name == cv::gapi::streaming::detail::GDesync::id()) { |
||||
drop(g, nh); |
||||
} // if (desync)
|
||||
} // if (Op)
|
||||
} // for(nodes)
|
||||
} |
||||
|
||||
} // anonymous namespace
|
||||
} // namespace desync
|
||||
|
||||
void cv::gimpl::passes::intrinDesync(ade::passes::PassContext &ctx) { |
||||
GModel::Graph gr(ctx.graph); |
||||
if (!gr.metadata().contains<HasIntrinsics>()) |
||||
return; |
||||
|
||||
gr.metadata().contains<Streaming>() |
||||
? desync::apply(gr) // Streaming compilation
|
||||
: desync::drop(gr); // Regular compilation
|
||||
} |
||||
|
||||
// Clears the HasIntrinsics flag if all intrinsics have been handled.
|
||||
void cv::gimpl::passes::intrinFinalize(ade::passes::PassContext &ctx) { |
||||
GModel::Graph gr(ctx.graph); |
||||
for (auto &&nh : gr.nodes()) { |
||||
if (gr.metadata(nh).get<NodeType>().t == NodeType::OP) { |
||||
const auto &op = gr.metadata(nh).get<Op>(); |
||||
if (is_intrinsic(op.k.name)) { |
||||
return; |
||||
} |
||||
} |
||||
} |
||||
// If reached here, really clear the flag
|
||||
gr.metadata().erase<HasIntrinsics>(); |
||||
} |
@ -0,0 +1,105 @@ |
||||
// This file is part of OpenCV project.
|
||||
// It is subject to the license terms in the LICENSE file found in the top-level directory
|
||||
// of this distribution and at http://opencv.org/license.html.
|
||||
//
|
||||
// Copyright (C) 2020 Intel Corporation
|
||||
|
||||
#ifndef OPENCV_GAPI_EXECUTOR_LAST_VALUE_HPP |
||||
#define OPENCV_GAPI_EXECUTOR_LAST_VALUE_HPP |
||||
|
||||
#include <mutex> |
||||
#include <condition_variable> |
||||
|
||||
#include <opencv2/gapi/util/optional.hpp> |
||||
#include <opencv2/gapi/own/assert.hpp> |
||||
|
||||
namespace cv { |
||||
namespace gapi { |
||||
namespace own { |
||||
|
||||
// This class implements a "Last Written Value" thing. Writer threads
|
||||
// (in our case, it is just one) can write as many values there as it
|
||||
// can.
|
||||
//
|
||||
// The reader thread gets only a value it gets at the time (or blocks
|
||||
// if there was no value written since the last read).
|
||||
//
|
||||
// Again, the implementation is highly inefficient right now.
|
||||
template<class T> |
||||
class last_written_value { |
||||
cv::util::optional<T> m_data; |
||||
|
||||
std::mutex m_mutex; |
||||
std::condition_variable m_cond_empty; |
||||
|
||||
void unsafe_pop(T &t); |
||||
|
||||
public: |
||||
last_written_value() {} |
||||
last_written_value(const last_written_value<T> &cc) |
||||
: m_data(cc.m_data) { |
||||
// FIXME: what to do with all that locks, etc?
|
||||
} |
||||
last_written_value(last_written_value<T> &&cc) |
||||
: m_data(std::move(cc.m_data)) { |
||||
// FIXME: what to do with all that locks, etc?
|
||||
} |
||||
|
||||
// FIXME: && versions
|
||||
void push(const T &t); |
||||
void pop(T &t); |
||||
bool try_pop(T &t); |
||||
|
||||
// Not thread-safe
|
||||
void clear(); |
||||
}; |
||||
|
||||
// Internal: do shared pop things assuming the lock is already there
|
||||
template<typename T> |
||||
void last_written_value<T>::unsafe_pop(T &t) { |
||||
GAPI_Assert(m_data.has_value()); |
||||
t = std::move(m_data.value()); |
||||
m_data.reset(); |
||||
} |
||||
|
||||
// Push an element to the queue. Blocking if there's no space left
|
||||
template<typename T> |
||||
void last_written_value<T>::push(const T& t) { |
||||
std::unique_lock<std::mutex> lock(m_mutex); |
||||
m_data = cv::util::make_optional(t); |
||||
lock.unlock(); |
||||
m_cond_empty.notify_one(); |
||||
} |
||||
|
||||
// Pop an element from the queue. Blocking if there's no items
|
||||
template<typename T> |
||||
void last_written_value<T>::pop(T &t) { |
||||
std::unique_lock<std::mutex> lock(m_mutex); |
||||
if (!m_data.has_value()) { |
||||
// if there is no data, wait
|
||||
m_cond_empty.wait(lock, [&](){return m_data.has_value();}); |
||||
} |
||||
unsafe_pop(t); |
||||
} |
||||
|
||||
// Try pop an element from the queue. Returns false if queue is empty
|
||||
template<typename T> |
||||
bool last_written_value<T>::try_pop(T &t) { |
||||
std::unique_lock<std::mutex> lock(m_mutex); |
||||
if (!m_data.has_value()) { |
||||
// if there is no data, return
|
||||
return false; |
||||
} |
||||
unsafe_pop(t); |
||||
return true; |
||||
} |
||||
|
||||
// Clear the value holder. This method is not thread-safe.
|
||||
template<typename T> |
||||
void last_written_value<T>::clear() { |
||||
m_data.reset(); |
||||
} |
||||
|
||||
}}} // namespace cv::gapi::own
|
||||
|
||||
#endif // OPENCV_GAPI_EXECUTOR_CONC_QUEUE_HPP
|
@ -0,0 +1,156 @@ |
||||
// This file is part of OpenCV project.
|
||||
// It is subject to the license terms in the LICENSE file found in the top-level directory
|
||||
// of this distribution and at http://opencv.org/license.html.
|
||||
//
|
||||
// Copyright (C) 2020 Intel Corporation
|
||||
|
||||
#include "../test_precomp.hpp" |
||||
|
||||
#include <unordered_set> |
||||
#include <thread> |
||||
|
||||
#include "executor/last_value.hpp" |
||||
|
||||
namespace opencv_test { |
||||
using namespace cv::gapi; |
||||
|
||||
TEST(LastValue, PushPop) { |
||||
own::last_written_value<int> v; |
||||
for (int i = 0; i < 100; i++) { |
||||
v.push(i); |
||||
|
||||
int x = 1; |
||||
v.pop(x); |
||||
EXPECT_EQ(x, i); |
||||
} |
||||
} |
||||
|
||||
TEST(LastValue, TryPop) { |
||||
own::last_written_value<int> v; |
||||
int x = 0; |
||||
EXPECT_FALSE(v.try_pop(x)); |
||||
|
||||
v.push(1); |
||||
EXPECT_TRUE(v.try_pop(x)); |
||||
EXPECT_EQ(1, x); |
||||
} |
||||
|
||||
TEST(LastValue, Clear) { |
||||
own::last_written_value<int> v; |
||||
v.push(42); |
||||
v.clear(); |
||||
|
||||
int x = 0; |
||||
EXPECT_FALSE(v.try_pop(x)); |
||||
} |
||||
|
||||
TEST(LastValue, Overwrite) { |
||||
own::last_written_value<int> v; |
||||
v.push(42); |
||||
v.push(0); |
||||
|
||||
int x = -1; |
||||
EXPECT_TRUE(v.try_pop(x)); |
||||
EXPECT_EQ(0, x); |
||||
} |
||||
|
||||
// In this test, every writer thread produces its own range of integer
|
||||
// numbers, writing those to a shared queue.
|
||||
//
|
||||
// Every reader thread pops elements from the queue (until -1 is
|
||||
// reached) and stores those in its own associated set.
|
||||
//
|
||||
// Finally, the master thread waits for completion of all other
|
||||
// threads and verifies that all the necessary data is
|
||||
// produced/obtained.
|
||||
namespace { |
||||
using StressParam = std::tuple<int // Num writer threads
|
||||
,int // Num elements per writer
|
||||
,int>; // Num reader threads
|
||||
constexpr int STOP_SIGN = -1; |
||||
constexpr int BASE = 1000; |
||||
} |
||||
struct LastValue_: public ::testing::TestWithParam<StressParam> { |
||||
using V = own::last_written_value<int>; |
||||
using S = std::unordered_set<int>; |
||||
|
||||
static void writer(int base, int writes, V& v) { |
||||
for (int i = 0; i < writes; i++) { |
||||
if (i % 2) { |
||||
std::this_thread::sleep_for(std::chrono::milliseconds{1}); |
||||
} |
||||
v.push(base + i); |
||||
} |
||||
v.push(STOP_SIGN); |
||||
} |
||||
|
||||
static void reader(V& v, S& s) { |
||||
int x = 0; |
||||
while (true) { |
||||
v.pop(x); |
||||
if (x == STOP_SIGN) { |
||||
// If this thread was lucky enough to read this STOP_SIGN,
|
||||
// push it back to v to make other possible readers able
|
||||
// to read it again (note due to the last_written_value
|
||||
// semantic, those STOP_SIGN could be simply lost i.e.
|
||||
// overwritten.
|
||||
v.push(STOP_SIGN); |
||||
return; |
||||
} |
||||
s.insert(x); |
||||
} |
||||
} |
||||
}; |
||||
|
||||
TEST_P(LastValue_, Test) |
||||
{ |
||||
int num_writers = 0; |
||||
int num_writes = 0; |
||||
int num_readers = 0; |
||||
std::tie(num_writers, num_writes, num_readers) = GetParam(); |
||||
|
||||
CV_Assert(num_writers < 20); |
||||
CV_Assert(num_writes < BASE); |
||||
|
||||
V v; |
||||
|
||||
// Start reader threads
|
||||
std::vector<S> storage(num_readers); |
||||
std::vector<std::thread> readers; |
||||
for (S& s : storage) { |
||||
readers.emplace_back(reader, std::ref(v), std::ref(s)); |
||||
} |
||||
|
||||
// Start writer threads, also pre-generate reference numbers
|
||||
S reference; |
||||
std::vector<std::thread> writers; |
||||
for (int w = 0; w < num_writers; w++) { |
||||
writers.emplace_back(writer, w*BASE, num_writes, std::ref(v)); |
||||
for (int r = 0; r < num_writes; r++) { |
||||
reference.insert(w*BASE + r); |
||||
} |
||||
} |
||||
|
||||
// Wait for completions
|
||||
for (auto &t : readers) t.join(); |
||||
for (auto &t : writers) t.join(); |
||||
|
||||
// Validate the result. Some values are read, and the values are
|
||||
// correct (i.e. such values have been written)
|
||||
std::size_t num_values_read = 0u; |
||||
for (const auto &s : storage) { |
||||
num_values_read += s.size(); |
||||
for (auto &x : s) { |
||||
EXPECT_TRUE(reference.count(x) > 0); |
||||
} |
||||
} |
||||
// NOTE: Some combinations may end-up in 0 values read
|
||||
// it is normal, the main thing is that the test shouldn't hang!
|
||||
EXPECT_LE(0u, num_values_read); |
||||
} |
||||
|
||||
INSTANTIATE_TEST_CASE_P(LastValueStress, LastValue_, |
||||
Combine( Values(1, 2, 4, 8, 16) // writers
|
||||
, Values(32, 96, 256) // writes
|
||||
, Values(1, 2, 10))); // readers
|
||||
} // namespace opencv_test
|
Loading…
Reference in new issue