diff --git a/modules/gapi/samples/pipeline_modeling_tool.cpp b/modules/gapi/samples/pipeline_modeling_tool.cpp index fda1c62985..7c202642a9 100644 --- a/modules/gapi/samples/pipeline_modeling_tool.cpp +++ b/modules/gapi/samples/pipeline_modeling_tool.cpp @@ -347,10 +347,14 @@ int main(int argc, char* argv[]) { std::map{{"CACHE_DIR", cached_dir}}; } - const double work_time_ms = - check_and_read(fs, "work_time", "Config"); - if (work_time_ms < 0) { - throw std::logic_error("work_time must be positive"); + auto opt_work_time_ms = readOpt(fs["work_time"]); + cv::optional opt_work_time_mcs; + if (opt_work_time_ms) { + const double work_time_ms = opt_work_time_ms.value(); + if (work_time_ms < 0) { + throw std::logic_error("work_time must be positive"); + } + opt_work_time_mcs = cv::optional(utils::ms_to_mcs(work_time_ms)); } auto pipelines_fn = check_and_get_fn(fs, "Pipelines", "Config"); @@ -369,6 +373,21 @@ int main(int argc, char* argv[]) { for (const auto& name : exec_list) { const auto& pl_fn = check_and_get_fn(pipelines_fn, name, "Pipelines"); builder.setName(name); + StopCriterion::Ptr stop_criterion; + auto opt_num_iters = readOpt(pl_fn["num_iters"]); + // NB: num_iters for specific pipeline takes priority over global work_time. + if (opt_num_iters) { + stop_criterion.reset(new NumItersCriterion(opt_num_iters.value())); + } else if (opt_work_time_mcs) { + stop_criterion.reset(new ElapsedTimeCriterion(opt_work_time_mcs.value())); + } else { + throw std::logic_error( + "Failed: Pipeline " + name + " doesn't have stop criterion!\n" + "Please specify either work_time: in the config root" + " or num_iters: for specific pipeline."); + } + builder.setStopCriterion(std::move(stop_criterion)); + // NB: Set source { const auto& src_fn = check_and_get_fn(pl_fn, "source", name); @@ -464,7 +483,7 @@ int main(int argc, char* argv[]) { for (size_t i = 0; i < pipelines.size(); ++i) { threads[i] = std::thread([&, i]() { try { - pipelines[i]->run(work_time_ms); + pipelines[i]->run(); } catch (...) { eptrs[i] = std::current_exception(); } diff --git a/modules/gapi/samples/pipeline_modeling_tool/pipeline.hpp b/modules/gapi/samples/pipeline_modeling_tool/pipeline.hpp index 91107c6dad..ac192cba52 100644 --- a/modules/gapi/samples/pipeline_modeling_tool/pipeline.hpp +++ b/modules/gapi/samples/pipeline_modeling_tool/pipeline.hpp @@ -40,6 +40,16 @@ std::string PerfReport::toStr(bool expand) const { return ss.str(); } +class StopCriterion { +public: + using Ptr = std::unique_ptr; + + virtual void start() = 0; + virtual void iter() = 0; + virtual bool done() = 0; + virtual ~StopCriterion() = default; +}; + class Pipeline { public: using Ptr = std::shared_ptr; @@ -47,28 +57,28 @@ public: Pipeline(std::string&& name, cv::GComputation&& comp, std::shared_ptr&& src, + StopCriterion::Ptr stop_criterion, cv::GCompileArgs&& args, const size_t num_outputs); void compile(); - void run(double work_time_ms); + void run(); + const PerfReport& report() const; const std::string& name() const { return m_name;} virtual ~Pipeline() = default; protected: - struct RunPerf { - int64_t elapsed = 0; - std::vector latencies; - }; - - virtual void _compile() = 0; - virtual RunPerf _run(double work_time_ms) = 0; + virtual void _compile() = 0; + virtual int64_t run_iter() = 0; + virtual void init() {}; + virtual void deinit() {}; std::string m_name; cv::GComputation m_comp; std::shared_ptr m_src; + StopCriterion::Ptr m_stop_criterion; cv::GCompileArgs m_args; size_t m_num_outputs; PerfReport m_perf; @@ -77,11 +87,13 @@ protected: Pipeline::Pipeline(std::string&& name, cv::GComputation&& comp, std::shared_ptr&& src, + StopCriterion::Ptr stop_criterion, cv::GCompileArgs&& args, const size_t num_outputs) : m_name(std::move(name)), m_comp(std::move(comp)), m_src(std::move(src)), + m_stop_criterion(std::move(stop_criterion)), m_args(std::move(args)), m_num_outputs(num_outputs) { m_perf.name = m_name; @@ -94,11 +106,23 @@ void Pipeline::compile() { }); } -void Pipeline::run(double work_time_ms) { - auto run_perf = _run(work_time_ms); +void Pipeline::run() { + using namespace std::chrono; + + init(); + auto start = high_resolution_clock::now(); + m_stop_criterion->start(); + while (true) { + m_perf.latencies.push_back(run_iter()); + m_perf.elapsed = duration_cast(high_resolution_clock::now() - start).count(); + m_stop_criterion->iter(); + + if (m_stop_criterion->done()) { + deinit(); + break; + } + } - m_perf.elapsed = run_perf.elapsed; - m_perf.latencies = std::move(run_perf.latencies); m_perf.avg_latency = utils::avg(m_perf.latencies); m_perf.min_latency = utils::min(m_perf.latencies); m_perf.max_latency = utils::max(m_perf.latencies); @@ -113,7 +137,6 @@ void Pipeline::run(double work_time_ms) { m_perf.throughput = (m_perf.latencies.size() / static_cast(m_perf.elapsed)) * 1000; - } const PerfReport& Pipeline::report() const { @@ -131,39 +154,31 @@ private: cv::GCompileArgs(m_args)); } - Pipeline::RunPerf _run(double work_time_ms) override { - // NB: Setup. + virtual void init() override { using namespace std::chrono; // NB: N-1 buffers + timestamp. - std::vector out_mats(m_num_outputs - 1); - int64_t start_ts = -1; - cv::GRunArgsP pipeline_outputs; - for (auto& m : out_mats) { - pipeline_outputs += cv::gout(m); + m_out_mats.resize(m_num_outputs - 1); + for (auto& m : m_out_mats) { + m_pipeline_outputs += cv::gout(m); } - pipeline_outputs += cv::gout(start_ts); + m_pipeline_outputs += cv::gout(m_start_ts); m_compiled.setSource(m_src); - - // NB: Start execution & measure performance statistics. - Pipeline::RunPerf perf; - auto start = high_resolution_clock::now(); m_compiled.start(); - while (m_compiled.pull(cv::GRunArgsP{pipeline_outputs})) { - int64_t latency = utils::timestamp() - start_ts; - - perf.latencies.push_back(latency); - perf.elapsed = duration_cast( - high_resolution_clock::now() - start).count(); - - if (perf.elapsed >= work_time_ms) { - m_compiled.stop(); - break; - } - }; - return perf; + } + + virtual void deinit() override { + m_compiled.stop(); + } + + virtual int64_t run_iter() override { + m_compiled.pull(cv::GRunArgsP{m_pipeline_outputs}); + return utils::timestamp() - m_start_ts; } cv::GStreamingCompiled m_compiled; + cv::GRunArgsP m_pipeline_outputs; + std::vector m_out_mats; + int64_t m_start_ts; }; class RegularPipeline : public Pipeline { @@ -177,37 +192,26 @@ private: cv::GCompileArgs(m_args)); } - Pipeline::RunPerf _run(double work_time_ms) override { - // NB: Setup - using namespace std::chrono; - cv::gapi::wip::Data d; - std::vector out_mats(m_num_outputs); - cv::GRunArgsP pipeline_outputs; - for (auto& m : out_mats) { - pipeline_outputs += cv::gout(m); + virtual void init() override { + m_out_mats.resize(m_num_outputs); + for (auto& m : m_out_mats) { + m_pipeline_outputs += cv::gout(m); } + } - // NB: Start execution & measure performance statistics. - Pipeline::RunPerf perf; - auto start = high_resolution_clock::now(); - while (m_src->pull(d)) { - auto in_mat = cv::util::get(d); - int64_t latency = utils::measure([&]{ - m_compiled(cv::gin(in_mat), cv::GRunArgsP{pipeline_outputs}); - }); - - perf.latencies.push_back(latency); - perf.elapsed = duration_cast( - high_resolution_clock::now() - start).count(); - - if (perf.elapsed >= work_time_ms) { - break; - } - }; - return perf; + virtual int64_t run_iter() override { + using namespace std::chrono; + cv::gapi::wip::Data d; + m_src->pull(d); + auto in_mat = cv::util::get(d); + return utils::measure([&]{ + m_compiled(cv::gin(in_mat), cv::GRunArgsP{m_pipeline_outputs}); + }); } - cv::GCompiled m_compiled; + cv::GCompiled m_compiled; + cv::GRunArgsP m_pipeline_outputs; + std::vector m_out_mats; }; enum class PLMode { diff --git a/modules/gapi/samples/pipeline_modeling_tool/pipeline_builder.hpp b/modules/gapi/samples/pipeline_modeling_tool/pipeline_builder.hpp index 094f4235e3..6ac6374f07 100644 --- a/modules/gapi/samples/pipeline_modeling_tool/pipeline_builder.hpp +++ b/modules/gapi/samples/pipeline_modeling_tool/pipeline_builder.hpp @@ -262,6 +262,65 @@ struct InferParams { cv::util::optional out_precision; }; +class ElapsedTimeCriterion : public StopCriterion { +public: + ElapsedTimeCriterion(int64_t work_time_mcs); + + void start() override; + void iter() override; + bool done() override; + +private: + int64_t m_work_time_mcs; + int64_t m_start_ts = -1; + int64_t m_curr_ts = -1; +}; + +ElapsedTimeCriterion::ElapsedTimeCriterion(int64_t work_time_mcs) + : m_work_time_mcs(work_time_mcs) { +}; + +void ElapsedTimeCriterion::start() { + m_start_ts = m_curr_ts = utils::timestamp(); +} + +void ElapsedTimeCriterion::iter() { + m_curr_ts = utils::timestamp(); +} + +bool ElapsedTimeCriterion::done() { + return (m_curr_ts - m_start_ts) >= m_work_time_mcs; +} + +class NumItersCriterion : public StopCriterion { +public: + NumItersCriterion(int64_t num_iters); + + void start() override; + void iter() override; + bool done() override; + +private: + int64_t m_num_iters; + int64_t m_curr_iters = 0; +}; + +NumItersCriterion::NumItersCriterion(int64_t num_iters) + : m_num_iters(num_iters) { +} + +void NumItersCriterion::start() { + m_curr_iters = 0; +} + +void NumItersCriterion::iter() { + ++m_curr_iters; +} + +bool NumItersCriterion::done() { + return m_curr_iters == m_num_iters; +} + class PipelineBuilder { public: PipelineBuilder(); @@ -279,6 +338,7 @@ public: void setDumpFilePath(const std::string& dump); void setQueueCapacity(const size_t qc); void setName(const std::string& name); + void setStopCriterion(StopCriterion::Ptr stop_criterion); Pipeline::Ptr build(); @@ -306,6 +366,7 @@ private: std::shared_ptr src; PLMode mode = PLMode::STREAMING; std::string name; + StopCriterion::Ptr stop_criterion; }; std::unique_ptr m_state; @@ -432,6 +493,10 @@ void PipelineBuilder::setName(const std::string& name) { m_state->name = name; } +void PipelineBuilder::setStopCriterion(StopCriterion::Ptr stop_criterion) { + m_state->stop_criterion = std::move(stop_criterion); +} + static bool visit(Node::Ptr node, std::vector& sorted, std::unordered_map& visited) { @@ -590,6 +655,7 @@ Pipeline::Ptr PipelineBuilder::construct() { } } + GAPI_Assert(m_state->stop_criterion); if (m_state->mode == PLMode::STREAMING) { GAPI_Assert(graph_inputs.size() == 1); GAPI_Assert(cv::util::holds_alternative(graph_inputs[0])); @@ -605,6 +671,7 @@ Pipeline::Ptr PipelineBuilder::construct() { cv::GProtoInputArgs{graph_inputs}, cv::GProtoOutputArgs{graph_outputs}), std::move(m_state->src), + std::move(m_state->stop_criterion), std::move(m_state->compile_args), graph_outputs.size()); } @@ -614,6 +681,7 @@ Pipeline::Ptr PipelineBuilder::construct() { cv::GProtoInputArgs{graph_inputs}, cv::GProtoOutputArgs{graph_outputs}), std::move(m_state->src), + std::move(m_state->stop_criterion), std::move(m_state->compile_args), graph_outputs.size()); } diff --git a/modules/gapi/samples/pipeline_modeling_tool/test_pipeline_modeling_tool.py b/modules/gapi/samples/pipeline_modeling_tool/test_pipeline_modeling_tool.py index d56a0399e9..d1701d9ad2 100644 --- a/modules/gapi/samples/pipeline_modeling_tool/test_pipeline_modeling_tool.py +++ b/modules/gapi/samples/pipeline_modeling_tool/test_pipeline_modeling_tool.py @@ -26,14 +26,6 @@ def test_error_no_config_exists(): assert 'Failed to open config file: not_existing_cfg.yml' in out -def test_error_no_work_time(): - cfg_file = """\"%YAML:1.0\" """ - - exec_str = '{} --cfg={}'.format(pipeline_modeling_tool, cfg_file) - out = get_output(exec_str) - assert out.startswith('Config must contain field: work_time') - - def test_error_work_time_not_positive(): cfg_file = """\"%YAML:1.0 work_time: -1\" """ @@ -77,7 +69,8 @@ def test_error_no_source(): cfg_file = """\"%YAML:1.0 work_time: 1000 Pipelines: - PL1:\" """ + PL1: + queue_capacity: 1\" """ exec_str = '{} --cfg={}'.format(pipeline_modeling_tool, cfg_file) out = get_output(exec_str) @@ -982,3 +975,29 @@ Pipelines: check(cfg_file, -3) check(cfg_file, 0) + + +def test_error_no_worktime_and_num_iters(): + cfg_file = """\"%YAML:1.0 +Pipelines: + PL1: + source: + name: 'Src' + latency: 20 + output: + dims: [1,1] + precision: 'U8' + nodes: + - name: 'Node0' + type: 'Dummy' + time: 0.2 + output: + dims: [1,2,3,4] + precision: 'U8' + edges: + - from: 'Src' + to: 'Node0'\" """ + + exec_str = '{} --cfg={}'.format(pipeline_modeling_tool, cfg_file) + out = get_output(exec_str) + assert out.startswith('Failed: Pipeline PL1 doesn\'t have stop criterion!') diff --git a/modules/gapi/samples/pipeline_modeling_tool/utils.hpp b/modules/gapi/samples/pipeline_modeling_tool/utils.hpp index 6eb6bb7202..c0f0897c35 100644 --- a/modules/gapi/samples/pipeline_modeling_tool/utils.hpp +++ b/modules/gapi/samples/pipeline_modeling_tool/utils.hpp @@ -119,6 +119,12 @@ T min(const std::vector& vec) { return *std::min_element(vec.begin(), vec.end()); } +template +int64_t ms_to_mcs(T ms) { + using namespace std::chrono; + return duration_cast(duration(ms)).count(); +} + } // namespace utils #endif // OPENCV_GAPI_PIPELINE_MODELING_TOOL_UTILS_HPP