Merge pull request #22596 from TolyaTalamanov:at/add-num-iter

[G-API] Pipeline modeling tool: Support num_iters criteria
pull/22611/head
Alexander Smorkalov 2 years ago committed by GitHub
commit 5389d9e0d4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 29
      modules/gapi/samples/pipeline_modeling_tool.cpp
  2. 134
      modules/gapi/samples/pipeline_modeling_tool/pipeline.hpp
  3. 68
      modules/gapi/samples/pipeline_modeling_tool/pipeline_builder.hpp
  4. 37
      modules/gapi/samples/pipeline_modeling_tool/test_pipeline_modeling_tool.py
  5. 6
      modules/gapi/samples/pipeline_modeling_tool/utils.hpp

@ -347,10 +347,14 @@ int main(int argc, char* argv[]) {
std::map<std::string, std::string>{{"CACHE_DIR", cached_dir}};
}
const double work_time_ms =
check_and_read<double>(fs, "work_time", "Config");
if (work_time_ms < 0) {
throw std::logic_error("work_time must be positive");
auto opt_work_time_ms = readOpt<double>(fs["work_time"]);
cv::optional<int64_t> opt_work_time_mcs;
if (opt_work_time_ms) {
const double work_time_ms = opt_work_time_ms.value();
if (work_time_ms < 0) {
throw std::logic_error("work_time must be positive");
}
opt_work_time_mcs = cv::optional<int64_t>(utils::ms_to_mcs(work_time_ms));
}
auto pipelines_fn = check_and_get_fn(fs, "Pipelines", "Config");
@ -369,6 +373,21 @@ int main(int argc, char* argv[]) {
for (const auto& name : exec_list) {
const auto& pl_fn = check_and_get_fn(pipelines_fn, name, "Pipelines");
builder.setName(name);
StopCriterion::Ptr stop_criterion;
auto opt_num_iters = readOpt<int>(pl_fn["num_iters"]);
// NB: num_iters for specific pipeline takes priority over global work_time.
if (opt_num_iters) {
stop_criterion.reset(new NumItersCriterion(opt_num_iters.value()));
} else if (opt_work_time_mcs) {
stop_criterion.reset(new ElapsedTimeCriterion(opt_work_time_mcs.value()));
} else {
throw std::logic_error(
"Failed: Pipeline " + name + " doesn't have stop criterion!\n"
"Please specify either work_time: <value> in the config root"
" or num_iters: <value> for specific pipeline.");
}
builder.setStopCriterion(std::move(stop_criterion));
// NB: Set source
{
const auto& src_fn = check_and_get_fn(pl_fn, "source", name);
@ -464,7 +483,7 @@ int main(int argc, char* argv[]) {
for (size_t i = 0; i < pipelines.size(); ++i) {
threads[i] = std::thread([&, i]() {
try {
pipelines[i]->run(work_time_ms);
pipelines[i]->run();
} catch (...) {
eptrs[i] = std::current_exception();
}

@ -40,6 +40,16 @@ std::string PerfReport::toStr(bool expand) const {
return ss.str();
}
class StopCriterion {
public:
using Ptr = std::unique_ptr<StopCriterion>;
virtual void start() = 0;
virtual void iter() = 0;
virtual bool done() = 0;
virtual ~StopCriterion() = default;
};
class Pipeline {
public:
using Ptr = std::shared_ptr<Pipeline>;
@ -47,28 +57,28 @@ public:
Pipeline(std::string&& name,
cv::GComputation&& comp,
std::shared_ptr<DummySource>&& src,
StopCriterion::Ptr stop_criterion,
cv::GCompileArgs&& args,
const size_t num_outputs);
void compile();
void run(double work_time_ms);
void run();
const PerfReport& report() const;
const std::string& name() const { return m_name;}
virtual ~Pipeline() = default;
protected:
struct RunPerf {
int64_t elapsed = 0;
std::vector<int64_t> latencies;
};
virtual void _compile() = 0;
virtual RunPerf _run(double work_time_ms) = 0;
virtual void _compile() = 0;
virtual int64_t run_iter() = 0;
virtual void init() {};
virtual void deinit() {};
std::string m_name;
cv::GComputation m_comp;
std::shared_ptr<DummySource> m_src;
StopCriterion::Ptr m_stop_criterion;
cv::GCompileArgs m_args;
size_t m_num_outputs;
PerfReport m_perf;
@ -77,11 +87,13 @@ protected:
Pipeline::Pipeline(std::string&& name,
cv::GComputation&& comp,
std::shared_ptr<DummySource>&& src,
StopCriterion::Ptr stop_criterion,
cv::GCompileArgs&& args,
const size_t num_outputs)
: m_name(std::move(name)),
m_comp(std::move(comp)),
m_src(std::move(src)),
m_stop_criterion(std::move(stop_criterion)),
m_args(std::move(args)),
m_num_outputs(num_outputs) {
m_perf.name = m_name;
@ -94,11 +106,23 @@ void Pipeline::compile() {
});
}
void Pipeline::run(double work_time_ms) {
auto run_perf = _run(work_time_ms);
void Pipeline::run() {
using namespace std::chrono;
init();
auto start = high_resolution_clock::now();
m_stop_criterion->start();
while (true) {
m_perf.latencies.push_back(run_iter());
m_perf.elapsed = duration_cast<milliseconds>(high_resolution_clock::now() - start).count();
m_stop_criterion->iter();
if (m_stop_criterion->done()) {
deinit();
break;
}
}
m_perf.elapsed = run_perf.elapsed;
m_perf.latencies = std::move(run_perf.latencies);
m_perf.avg_latency = utils::avg(m_perf.latencies);
m_perf.min_latency = utils::min(m_perf.latencies);
m_perf.max_latency = utils::max(m_perf.latencies);
@ -113,7 +137,6 @@ void Pipeline::run(double work_time_ms) {
m_perf.throughput =
(m_perf.latencies.size() / static_cast<double>(m_perf.elapsed)) * 1000;
}
const PerfReport& Pipeline::report() const {
@ -131,39 +154,31 @@ private:
cv::GCompileArgs(m_args));
}
Pipeline::RunPerf _run(double work_time_ms) override {
// NB: Setup.
virtual void init() override {
using namespace std::chrono;
// NB: N-1 buffers + timestamp.
std::vector<cv::Mat> out_mats(m_num_outputs - 1);
int64_t start_ts = -1;
cv::GRunArgsP pipeline_outputs;
for (auto& m : out_mats) {
pipeline_outputs += cv::gout(m);
m_out_mats.resize(m_num_outputs - 1);
for (auto& m : m_out_mats) {
m_pipeline_outputs += cv::gout(m);
}
pipeline_outputs += cv::gout(start_ts);
m_pipeline_outputs += cv::gout(m_start_ts);
m_compiled.setSource(m_src);
// NB: Start execution & measure performance statistics.
Pipeline::RunPerf perf;
auto start = high_resolution_clock::now();
m_compiled.start();
while (m_compiled.pull(cv::GRunArgsP{pipeline_outputs})) {
int64_t latency = utils::timestamp<milliseconds>() - start_ts;
perf.latencies.push_back(latency);
perf.elapsed = duration_cast<milliseconds>(
high_resolution_clock::now() - start).count();
if (perf.elapsed >= work_time_ms) {
m_compiled.stop();
break;
}
};
return perf;
}
virtual void deinit() override {
m_compiled.stop();
}
virtual int64_t run_iter() override {
m_compiled.pull(cv::GRunArgsP{m_pipeline_outputs});
return utils::timestamp<std::chrono::milliseconds>() - m_start_ts;
}
cv::GStreamingCompiled m_compiled;
cv::GRunArgsP m_pipeline_outputs;
std::vector<cv::Mat> m_out_mats;
int64_t m_start_ts;
};
class RegularPipeline : public Pipeline {
@ -177,37 +192,26 @@ private:
cv::GCompileArgs(m_args));
}
Pipeline::RunPerf _run(double work_time_ms) override {
// NB: Setup
using namespace std::chrono;
cv::gapi::wip::Data d;
std::vector<cv::Mat> out_mats(m_num_outputs);
cv::GRunArgsP pipeline_outputs;
for (auto& m : out_mats) {
pipeline_outputs += cv::gout(m);
virtual void init() override {
m_out_mats.resize(m_num_outputs);
for (auto& m : m_out_mats) {
m_pipeline_outputs += cv::gout(m);
}
}
// NB: Start execution & measure performance statistics.
Pipeline::RunPerf perf;
auto start = high_resolution_clock::now();
while (m_src->pull(d)) {
auto in_mat = cv::util::get<cv::Mat>(d);
int64_t latency = utils::measure<milliseconds>([&]{
m_compiled(cv::gin(in_mat), cv::GRunArgsP{pipeline_outputs});
});
perf.latencies.push_back(latency);
perf.elapsed = duration_cast<milliseconds>(
high_resolution_clock::now() - start).count();
if (perf.elapsed >= work_time_ms) {
break;
}
};
return perf;
virtual int64_t run_iter() override {
using namespace std::chrono;
cv::gapi::wip::Data d;
m_src->pull(d);
auto in_mat = cv::util::get<cv::Mat>(d);
return utils::measure<milliseconds>([&]{
m_compiled(cv::gin(in_mat), cv::GRunArgsP{m_pipeline_outputs});
});
}
cv::GCompiled m_compiled;
cv::GCompiled m_compiled;
cv::GRunArgsP m_pipeline_outputs;
std::vector<cv::Mat> m_out_mats;
};
enum class PLMode {

@ -262,6 +262,65 @@ struct InferParams {
cv::util::optional<int> out_precision;
};
class ElapsedTimeCriterion : public StopCriterion {
public:
ElapsedTimeCriterion(int64_t work_time_mcs);
void start() override;
void iter() override;
bool done() override;
private:
int64_t m_work_time_mcs;
int64_t m_start_ts = -1;
int64_t m_curr_ts = -1;
};
ElapsedTimeCriterion::ElapsedTimeCriterion(int64_t work_time_mcs)
: m_work_time_mcs(work_time_mcs) {
};
void ElapsedTimeCriterion::start() {
m_start_ts = m_curr_ts = utils::timestamp<std::chrono::microseconds>();
}
void ElapsedTimeCriterion::iter() {
m_curr_ts = utils::timestamp<std::chrono::microseconds>();
}
bool ElapsedTimeCriterion::done() {
return (m_curr_ts - m_start_ts) >= m_work_time_mcs;
}
class NumItersCriterion : public StopCriterion {
public:
NumItersCriterion(int64_t num_iters);
void start() override;
void iter() override;
bool done() override;
private:
int64_t m_num_iters;
int64_t m_curr_iters = 0;
};
NumItersCriterion::NumItersCriterion(int64_t num_iters)
: m_num_iters(num_iters) {
}
void NumItersCriterion::start() {
m_curr_iters = 0;
}
void NumItersCriterion::iter() {
++m_curr_iters;
}
bool NumItersCriterion::done() {
return m_curr_iters == m_num_iters;
}
class PipelineBuilder {
public:
PipelineBuilder();
@ -279,6 +338,7 @@ public:
void setDumpFilePath(const std::string& dump);
void setQueueCapacity(const size_t qc);
void setName(const std::string& name);
void setStopCriterion(StopCriterion::Ptr stop_criterion);
Pipeline::Ptr build();
@ -306,6 +366,7 @@ private:
std::shared_ptr<DummySource> src;
PLMode mode = PLMode::STREAMING;
std::string name;
StopCriterion::Ptr stop_criterion;
};
std::unique_ptr<State> m_state;
@ -432,6 +493,10 @@ void PipelineBuilder::setName(const std::string& name) {
m_state->name = name;
}
void PipelineBuilder::setStopCriterion(StopCriterion::Ptr stop_criterion) {
m_state->stop_criterion = std::move(stop_criterion);
}
static bool visit(Node::Ptr node,
std::vector<Node::Ptr>& sorted,
std::unordered_map<Node::Ptr, int>& visited) {
@ -590,6 +655,7 @@ Pipeline::Ptr PipelineBuilder::construct() {
}
}
GAPI_Assert(m_state->stop_criterion);
if (m_state->mode == PLMode::STREAMING) {
GAPI_Assert(graph_inputs.size() == 1);
GAPI_Assert(cv::util::holds_alternative<cv::GMat>(graph_inputs[0]));
@ -605,6 +671,7 @@ Pipeline::Ptr PipelineBuilder::construct() {
cv::GProtoInputArgs{graph_inputs},
cv::GProtoOutputArgs{graph_outputs}),
std::move(m_state->src),
std::move(m_state->stop_criterion),
std::move(m_state->compile_args),
graph_outputs.size());
}
@ -614,6 +681,7 @@ Pipeline::Ptr PipelineBuilder::construct() {
cv::GProtoInputArgs{graph_inputs},
cv::GProtoOutputArgs{graph_outputs}),
std::move(m_state->src),
std::move(m_state->stop_criterion),
std::move(m_state->compile_args),
graph_outputs.size());
}

@ -26,14 +26,6 @@ def test_error_no_config_exists():
assert 'Failed to open config file: not_existing_cfg.yml' in out
def test_error_no_work_time():
cfg_file = """\"%YAML:1.0\" """
exec_str = '{} --cfg={}'.format(pipeline_modeling_tool, cfg_file)
out = get_output(exec_str)
assert out.startswith('Config must contain field: work_time')
def test_error_work_time_not_positive():
cfg_file = """\"%YAML:1.0
work_time: -1\" """
@ -77,7 +69,8 @@ def test_error_no_source():
cfg_file = """\"%YAML:1.0
work_time: 1000
Pipelines:
PL1:\" """
PL1:
queue_capacity: 1\" """
exec_str = '{} --cfg={}'.format(pipeline_modeling_tool, cfg_file)
out = get_output(exec_str)
@ -982,3 +975,29 @@ Pipelines:
check(cfg_file, -3)
check(cfg_file, 0)
def test_error_no_worktime_and_num_iters():
cfg_file = """\"%YAML:1.0
Pipelines:
PL1:
source:
name: 'Src'
latency: 20
output:
dims: [1,1]
precision: 'U8'
nodes:
- name: 'Node0'
type: 'Dummy'
time: 0.2
output:
dims: [1,2,3,4]
precision: 'U8'
edges:
- from: 'Src'
to: 'Node0'\" """
exec_str = '{} --cfg={}'.format(pipeline_modeling_tool, cfg_file)
out = get_output(exec_str)
assert out.startswith('Failed: Pipeline PL1 doesn\'t have stop criterion!')

@ -119,6 +119,12 @@ T min(const std::vector<T>& vec) {
return *std::min_element(vec.begin(), vec.end());
}
template <typename T>
int64_t ms_to_mcs(T ms) {
using namespace std::chrono;
return duration_cast<microseconds>(duration<T, std::milli>(ms)).count();
}
} // namespace utils
#endif // OPENCV_GAPI_PIPELINE_MODELING_TOOL_UTILS_HPP

Loading…
Cancel
Save