Support num_iters criteria for pipeline tool

pull/22596/head
TolyaTalamanov 2 years ago
parent bf5d7c0c10
commit 1113c9ab10
  1. 29
      modules/gapi/samples/pipeline_modeling_tool.cpp
  2. 134
      modules/gapi/samples/pipeline_modeling_tool/pipeline.hpp
  3. 52
      modules/gapi/samples/pipeline_modeling_tool/pipeline_builder.hpp
  4. 6
      modules/gapi/samples/pipeline_modeling_tool/utils.hpp

@ -347,10 +347,14 @@ int main(int argc, char* argv[]) {
std::map<std::string, std::string>{{"CACHE_DIR", cached_dir}};
}
const double work_time_ms =
check_and_read<double>(fs, "work_time", "Config");
if (work_time_ms < 0) {
throw std::logic_error("work_time must be positive");
auto opt_work_time_ms = readOpt<double>(fs["work_time"]);
cv::optional<int64_t> opt_work_time_mcs;
if (opt_work_time_ms) {
const double work_time_ms = opt_work_time_ms.value();
if (work_time_ms < 0) {
throw std::logic_error("work_time must be positive");
}
opt_work_time_mcs = cv::optional<int64_t>(utils::ms_to_mcs(work_time_ms));
}
auto pipelines_fn = check_and_get_fn(fs, "Pipelines", "Config");
@ -369,6 +373,21 @@ int main(int argc, char* argv[]) {
for (const auto& name : exec_list) {
const auto& pl_fn = check_and_get_fn(pipelines_fn, name, "Pipelines");
builder.setName(name);
StopCriteria::Ptr stop_criteria;
auto opt_num_iters = readOpt<int>(pl_fn["num_iters"]);
// NB: num_iters for specific pipeline takes priority over global work_time.
if (opt_num_iters) {
stop_criteria.reset(new NumItersCriteria(opt_num_iters.value()));
} else if (opt_work_time_mcs) {
stop_criteria.reset(new ElapsedTimeCriteria(opt_work_time_mcs.value()));
} else {
throw std::logic_error(
"Failed: Pipeline " + name + " doesn't have stop criteria!\n"
"Please specify either work_time: <value> in the config root"
" or num_iters: <value> for specific pipeline.");
}
builder.setStopCriteria(std::move(stop_criteria));
// NB: Set source
{
const auto& src_fn = check_and_get_fn(pl_fn, "source", name);
@ -464,7 +483,7 @@ int main(int argc, char* argv[]) {
for (size_t i = 0; i < pipelines.size(); ++i) {
threads[i] = std::thread([&, i]() {
try {
pipelines[i]->run(work_time_ms);
pipelines[i]->run();
} catch (...) {
eptrs[i] = std::current_exception();
}

@ -40,6 +40,16 @@ std::string PerfReport::toStr(bool expand) const {
return ss.str();
}
class StopCriteria {
public:
using Ptr = std::unique_ptr<StopCriteria>;
virtual void start() = 0;
virtual void iter() = 0;
virtual bool isOver() = 0;
virtual ~StopCriteria() = default;
};
class Pipeline {
public:
using Ptr = std::shared_ptr<Pipeline>;
@ -47,28 +57,28 @@ public:
Pipeline(std::string&& name,
cv::GComputation&& comp,
std::shared_ptr<DummySource>&& src,
StopCriteria::Ptr stop_criteria,
cv::GCompileArgs&& args,
const size_t num_outputs);
void compile();
void run(double work_time_ms);
void run();
const PerfReport& report() const;
const std::string& name() const { return m_name;}
virtual ~Pipeline() = default;
protected:
struct RunPerf {
int64_t elapsed = 0;
std::vector<int64_t> latencies;
};
virtual void _compile() = 0;
virtual RunPerf _run(double work_time_ms) = 0;
virtual void _compile() = 0;
virtual int64_t run_iter() = 0;
virtual void init() {};
virtual void deinit() {};
std::string m_name;
cv::GComputation m_comp;
std::shared_ptr<DummySource> m_src;
StopCriteria::Ptr m_stop_criteria;
cv::GCompileArgs m_args;
size_t m_num_outputs;
PerfReport m_perf;
@ -77,11 +87,13 @@ protected:
Pipeline::Pipeline(std::string&& name,
cv::GComputation&& comp,
std::shared_ptr<DummySource>&& src,
StopCriteria::Ptr stop_criteria,
cv::GCompileArgs&& args,
const size_t num_outputs)
: m_name(std::move(name)),
m_comp(std::move(comp)),
m_src(std::move(src)),
m_stop_criteria(std::move(stop_criteria)),
m_args(std::move(args)),
m_num_outputs(num_outputs) {
m_perf.name = m_name;
@ -94,11 +106,23 @@ void Pipeline::compile() {
});
}
void Pipeline::run(double work_time_ms) {
auto run_perf = _run(work_time_ms);
void Pipeline::run() {
using namespace std::chrono;
init();
auto start = high_resolution_clock::now();
m_stop_criteria->start();
while (true) {
m_perf.latencies.push_back(run_iter());
m_perf.elapsed = duration_cast<milliseconds>(high_resolution_clock::now() - start).count();
m_stop_criteria->iter();
if (m_stop_criteria->isOver()) {
deinit();
break;
}
}
m_perf.elapsed = run_perf.elapsed;
m_perf.latencies = std::move(run_perf.latencies);
m_perf.avg_latency = utils::avg(m_perf.latencies);
m_perf.min_latency = utils::min(m_perf.latencies);
m_perf.max_latency = utils::max(m_perf.latencies);
@ -113,7 +137,6 @@ void Pipeline::run(double work_time_ms) {
m_perf.throughput =
(m_perf.latencies.size() / static_cast<double>(m_perf.elapsed)) * 1000;
}
const PerfReport& Pipeline::report() const {
@ -131,39 +154,31 @@ private:
cv::GCompileArgs(m_args));
}
Pipeline::RunPerf _run(double work_time_ms) override {
// NB: Setup.
virtual void init() override {
using namespace std::chrono;
// NB: N-1 buffers + timestamp.
std::vector<cv::Mat> out_mats(m_num_outputs - 1);
int64_t start_ts = -1;
cv::GRunArgsP pipeline_outputs;
for (auto& m : out_mats) {
pipeline_outputs += cv::gout(m);
m_out_mats.resize(m_num_outputs - 1);
for (auto& m : m_out_mats) {
m_pipeline_outputs += cv::gout(m);
}
pipeline_outputs += cv::gout(start_ts);
m_pipeline_outputs += cv::gout(m_start_ts);
m_compiled.setSource(m_src);
// NB: Start execution & measure performance statistics.
Pipeline::RunPerf perf;
auto start = high_resolution_clock::now();
m_compiled.start();
while (m_compiled.pull(cv::GRunArgsP{pipeline_outputs})) {
int64_t latency = utils::timestamp<milliseconds>() - start_ts;
perf.latencies.push_back(latency);
perf.elapsed = duration_cast<milliseconds>(
high_resolution_clock::now() - start).count();
if (perf.elapsed >= work_time_ms) {
m_compiled.stop();
break;
}
};
return perf;
}
virtual void deinit() override {
m_compiled.stop();
}
virtual int64_t run_iter() override {
m_compiled.pull(cv::GRunArgsP{m_pipeline_outputs});
return utils::timestamp<std::chrono::milliseconds>() - m_start_ts;
}
cv::GStreamingCompiled m_compiled;
cv::GRunArgsP m_pipeline_outputs;
std::vector<cv::Mat> m_out_mats;
int64_t m_start_ts;
};
class RegularPipeline : public Pipeline {
@ -177,37 +192,26 @@ private:
cv::GCompileArgs(m_args));
}
Pipeline::RunPerf _run(double work_time_ms) override {
// NB: Setup
using namespace std::chrono;
cv::gapi::wip::Data d;
std::vector<cv::Mat> out_mats(m_num_outputs);
cv::GRunArgsP pipeline_outputs;
for (auto& m : out_mats) {
pipeline_outputs += cv::gout(m);
virtual void init() override {
m_out_mats.resize(m_num_outputs);
for (auto& m : m_out_mats) {
m_pipeline_outputs += cv::gout(m);
}
}
// NB: Start execution & measure performance statistics.
Pipeline::RunPerf perf;
auto start = high_resolution_clock::now();
while (m_src->pull(d)) {
auto in_mat = cv::util::get<cv::Mat>(d);
int64_t latency = utils::measure<milliseconds>([&]{
m_compiled(cv::gin(in_mat), cv::GRunArgsP{pipeline_outputs});
});
perf.latencies.push_back(latency);
perf.elapsed = duration_cast<milliseconds>(
high_resolution_clock::now() - start).count();
if (perf.elapsed >= work_time_ms) {
break;
}
};
return perf;
virtual int64_t run_iter() override {
using namespace std::chrono;
cv::gapi::wip::Data d;
m_src->pull(d);
auto in_mat = cv::util::get<cv::Mat>(d);
return utils::measure<milliseconds>([&]{
m_compiled(cv::gin(in_mat), cv::GRunArgsP{m_pipeline_outputs});
});
}
cv::GCompiled m_compiled;
cv::GCompiled m_compiled;
cv::GRunArgsP m_pipeline_outputs;
std::vector<cv::Mat> m_out_mats;
};
enum class PLMode {

@ -262,6 +262,49 @@ struct InferParams {
cv::util::optional<int> out_precision;
};
class ElapsedTimeCriteria : public StopCriteria {
public:
ElapsedTimeCriteria(int64_t work_time_mcs) : m_work_time_mcs(work_time_mcs) { };
void start() override {
m_start_ts = m_curr_ts = utils::timestamp<std::chrono::microseconds>();
}
void iter() override {
m_curr_ts = utils::timestamp<std::chrono::microseconds>();
}
bool isOver() override {
return (m_curr_ts - m_start_ts) >= m_work_time_mcs;
}
private:
int64_t m_work_time_mcs;
int64_t m_start_ts = -1;
int64_t m_curr_ts = -1;
};
class NumItersCriteria : public StopCriteria {
public:
NumItersCriteria(int64_t num_iters) : m_num_iters(num_iters) { };
void start() override {
m_curr_iters = 0;
}
void iter() override {
++m_curr_iters;
}
bool isOver() override {
return m_curr_iters == m_num_iters;
}
private:
int64_t m_num_iters;
int64_t m_curr_iters = 0;
};
class PipelineBuilder {
public:
PipelineBuilder();
@ -279,6 +322,7 @@ public:
void setDumpFilePath(const std::string& dump);
void setQueueCapacity(const size_t qc);
void setName(const std::string& name);
void setStopCriteria(StopCriteria::Ptr stop_criteria);
Pipeline::Ptr build();
@ -306,6 +350,7 @@ private:
std::shared_ptr<DummySource> src;
PLMode mode = PLMode::STREAMING;
std::string name;
StopCriteria::Ptr stop_criteria;
};
std::unique_ptr<State> m_state;
@ -432,6 +477,10 @@ void PipelineBuilder::setName(const std::string& name) {
m_state->name = name;
}
void PipelineBuilder::setStopCriteria(StopCriteria::Ptr stop_criteria) {
m_state->stop_criteria = std::move(stop_criteria);
}
static bool visit(Node::Ptr node,
std::vector<Node::Ptr>& sorted,
std::unordered_map<Node::Ptr, int>& visited) {
@ -590,6 +639,7 @@ Pipeline::Ptr PipelineBuilder::construct() {
}
}
GAPI_Assert(m_state->stop_criteria);
if (m_state->mode == PLMode::STREAMING) {
GAPI_Assert(graph_inputs.size() == 1);
GAPI_Assert(cv::util::holds_alternative<cv::GMat>(graph_inputs[0]));
@ -605,6 +655,7 @@ Pipeline::Ptr PipelineBuilder::construct() {
cv::GProtoInputArgs{graph_inputs},
cv::GProtoOutputArgs{graph_outputs}),
std::move(m_state->src),
std::move(m_state->stop_criteria),
std::move(m_state->compile_args),
graph_outputs.size());
}
@ -614,6 +665,7 @@ Pipeline::Ptr PipelineBuilder::construct() {
cv::GProtoInputArgs{graph_inputs},
cv::GProtoOutputArgs{graph_outputs}),
std::move(m_state->src),
std::move(m_state->stop_criteria),
std::move(m_state->compile_args),
graph_outputs.size());
}

@ -119,6 +119,12 @@ T min(const std::vector<T>& vec) {
return *std::min_element(vec.begin(), vec.end());
}
template <typename T>
int64_t ms_to_mcs(T ms) {
using namespace std::chrono;
return duration_cast<microseconds>(duration<T, std::milli>(ms)).count();
}
} // namespace utils
#endif // OPENCV_GAPI_PIPELINE_MODELING_TOOL_UTILS_HPP

Loading…
Cancel
Save