Merge pull request #23211 from TolyaTalamanov:at/pipeline-modeling-tool-perf-alignment

[G-API] Pipeline modeling tool: Refactor calculating performance statistics

* Add warmup execution

* Align perf metrics

* Add busy wait mode for source

* Small fix for late frames

* pl_fn to src_fn

* Change show statistics

* Correct warm-up iteration

* Properly calculate drop frames

* Enable frame dropping for streaming mode

* Enable frame dropping for streaming mode

* Fix comments to review

* Fix typos

* Cosmetic
pull/23258/head
Anatoliy Talamanov 2 years ago committed by GitHub
parent 58d8a2702a
commit 6c235c8edb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 25
      modules/gapi/samples/pipeline_modeling_tool.cpp
  2. 63
      modules/gapi/samples/pipeline_modeling_tool/dummy_source.hpp
  3. 160
      modules/gapi/samples/pipeline_modeling_tool/pipeline.hpp
  4. 27
      modules/gapi/samples/pipeline_modeling_tool/pipeline_builder.hpp
  5. 26
      modules/gapi/samples/pipeline_modeling_tool/utils.hpp

@ -35,6 +35,22 @@ static AppMode strToAppMode(const std::string& mode_str) {
} }
} }
enum class WaitMode {
BUSY,
SLEEP
};
static WaitMode strToWaitMode(const std::string& mode_str) {
if (mode_str == "sleep") {
return WaitMode::SLEEP;
} else if (mode_str == "busy") {
return WaitMode::BUSY;
} else {
throw std::logic_error("Unsupported wait mode: " + mode_str +
"\nPlease chose between: busy (default) and sleep");
}
}
template <typename T> template <typename T>
T read(const cv::FileNode& node) { T read(const cv::FileNode& node) {
return static_cast<T>(node); return static_cast<T>(node);
@ -401,7 +417,12 @@ int main(int argc, char* argv[]) {
if (app_mode == AppMode::BENCHMARK) { if (app_mode == AppMode::BENCHMARK) {
latency = 0.0; latency = 0.0;
} }
auto src = std::make_shared<DummySource>(latency, output, drop_frames);
const auto wait_mode =
strToWaitMode(readOpt<std::string>(src_fn["wait_mode"]).value_or("busy"));
auto wait_strategy = (wait_mode == WaitMode::SLEEP) ? utils::sleep : utils::busyWait;
auto src = std::make_shared<DummySource>(
utils::double_ms_t{latency}, output, drop_frames, std::move(wait_strategy));
builder.setSource(src_name, src); builder.setSource(src_name, src);
} }
@ -446,7 +467,7 @@ int main(int argc, char* argv[]) {
// NB: Pipeline mode from config takes priority over cmd. // NB: Pipeline mode from config takes priority over cmd.
auto pl_mode = cfg_pl_mode.has_value() auto pl_mode = cfg_pl_mode.has_value()
? strToPLMode(cfg_pl_mode.value()) : cmd_pl_mode; ? strToPLMode(cfg_pl_mode.value()) : cmd_pl_mode;
// NB: Using drop_frames with streaming pipelines will follow to // NB: Using drop_frames with streaming pipelines will lead to
// incorrect performance results. // incorrect performance results.
if (drop_frames && pl_mode == PLMode::STREAMING) { if (drop_frames && pl_mode == PLMode::STREAMING) {
throw std::logic_error( throw std::logic_error(

@ -12,26 +12,36 @@
class DummySource final: public cv::gapi::wip::IStreamSource { class DummySource final: public cv::gapi::wip::IStreamSource {
public: public:
using WaitStrategy = std::function<void(std::chrono::microseconds)>;
using Ptr = std::shared_ptr<DummySource>; using Ptr = std::shared_ptr<DummySource>;
DummySource(const double latency, using ts_t = std::chrono::microseconds;
template <typename DurationT>
DummySource(const DurationT latency,
const OutputDescr& output, const OutputDescr& output,
const bool drop_frames); const bool drop_frames,
WaitStrategy&& wait);
bool pull(cv::gapi::wip::Data& data) override; bool pull(cv::gapi::wip::Data& data) override;
cv::GMetaArg descr_of() const override; cv::GMetaArg descr_of() const override;
double latency() const { return m_latency; };
private: private:
double m_latency; int64_t m_latency;
cv::Mat m_mat; cv::Mat m_mat;
bool m_drop_frames; bool m_drop_frames;
double m_next_tick_ts = -1; int64_t m_next_tick_ts = -1;
int64_t m_curr_seq_id = 0; int64_t m_curr_seq_id = 0;
WaitStrategy m_wait;
}; };
DummySource::DummySource(const double latency, template <typename DurationT>
DummySource::DummySource(const DurationT latency,
const OutputDescr& output, const OutputDescr& output,
const bool drop_frames) const bool drop_frames,
: m_latency(latency), m_drop_frames(drop_frames) { WaitStrategy&& wait)
: m_latency(std::chrono::duration_cast<ts_t>(latency).count()),
m_drop_frames(drop_frames),
m_wait(std::move(wait)) {
utils::createNDMat(m_mat, output.dims, output.precision); utils::createNDMat(m_mat, output.dims, output.precision);
utils::generateRandom(m_mat); utils::generateRandom(m_mat);
} }
@ -42,10 +52,10 @@ bool DummySource::pull(cv::gapi::wip::Data& data) {
// NB: Wait m_latency before return the first frame. // NB: Wait m_latency before return the first frame.
if (m_next_tick_ts == -1) { if (m_next_tick_ts == -1) {
m_next_tick_ts = utils::timestamp<milliseconds>() + m_latency; m_next_tick_ts = utils::timestamp<ts_t>() + m_latency;
} }
int64_t curr_ts = utils::timestamp<milliseconds>(); int64_t curr_ts = utils::timestamp<ts_t>();
if (curr_ts < m_next_tick_ts) { if (curr_ts < m_next_tick_ts) {
/* /*
* curr_ts * curr_ts
@ -57,8 +67,8 @@ bool DummySource::pull(cv::gapi::wip::Data& data) {
* *
* NB: New frame will be produced at the m_next_tick_ts point. * NB: New frame will be produced at the m_next_tick_ts point.
*/ */
utils::sleep(m_next_tick_ts - curr_ts); m_wait(ts_t{m_next_tick_ts - curr_ts});
} else { } else if (m_latency != 0) {
/* /*
* curr_ts * curr_ts
* +1 +2 | * +1 +2 |
@ -66,29 +76,28 @@ bool DummySource::pull(cv::gapi::wip::Data& data) {
* ^ ^ * ^ ^
* m_next_tick_ts -------------> * m_next_tick_ts ------------->
* *
*
* NB: Shift m_next_tick_ts to the nearest tick before curr_ts and
* update current seq_id correspondingly.
*
* if drop_frames is enabled, wait for the next tick, otherwise
* return last written frame (+2 at the picture above) immediately.
*/ */
// NB: Count how many frames have been produced since last pull (m_next_tick_ts).
int64_t num_frames = int64_t num_frames =
static_cast<int64_t>((curr_ts - m_next_tick_ts) / m_latency); static_cast<int64_t>((curr_ts - m_next_tick_ts) / m_latency);
m_curr_seq_id += num_frames; // NB: Shift m_next_tick_ts to the nearest tick before curr_ts.
m_next_tick_ts += num_frames * m_latency; m_next_tick_ts += num_frames * m_latency;
// NB: if drop_frames is enabled, update current seq_id and wait for the next tick, otherwise
// return last written frame (+2 at the picture above) immediately.
if (m_drop_frames) { if (m_drop_frames) {
// NB: Shift tick to the next frame.
m_next_tick_ts += m_latency; m_next_tick_ts += m_latency;
++m_curr_seq_id; // NB: Wait for the next frame.
utils::sleep(m_next_tick_ts - curr_ts); m_wait(ts_t{m_next_tick_ts - curr_ts});
// NB: Drop already produced frames + update seq_id for the current.
m_curr_seq_id += num_frames + 1;
} }
} }
// NB: Just increase reference counter not to release mat memory // NB: Just increase reference counter not to release mat memory
// after assigning it to the data. // after assigning it to the data.
cv::Mat mat = m_mat; cv::Mat mat = m_mat;
data.meta[meta_tag::timestamp] = utils::timestamp<ts_t>();
data.meta[meta_tag::timestamp] = utils::timestamp<milliseconds>();
data.meta[meta_tag::seq_id] = m_curr_seq_id++; data.meta[meta_tag::seq_id] = m_curr_seq_id++;
data = mat; data = mat;
m_next_tick_ts += m_latency; m_next_tick_ts += m_latency;

@ -6,34 +6,39 @@
struct PerfReport { struct PerfReport {
std::string name; std::string name;
double avg_latency = 0.0; double avg_latency = 0.0;
int64_t min_latency = 0; double min_latency = 0.0;
int64_t max_latency = 0; double max_latency = 0.0;
int64_t first_latency = 0; double first_latency = 0.0;
double throughput = 0.0; double throughput = 0.0;
int64_t elapsed = 0; double elapsed = 0.0;
int64_t warmup_time = 0; double warmup_time = 0.0;
int64_t num_late_frames = 0; int64_t num_late_frames = 0;
std::vector<int64_t> latencies; std::vector<double> latencies;
std::vector<int64_t> seq_ids;
std::string toStr(bool expanded = false) const; std::string toStr(bool expanded = false) const;
}; };
std::string PerfReport::toStr(bool expand) const { std::string PerfReport::toStr(bool expand) const {
const auto to_double_str = [](double val) {
std::stringstream ss;
ss << std::fixed << std::setprecision(3) << val;
return ss.str();
};
std::stringstream ss; std::stringstream ss;
ss << name << ": \n" ss << name << ": warm-up: " << to_double_str(warmup_time)
<< " Warm up time: " << warmup_time << " ms\n" << " ms, execution time: " << to_double_str(elapsed)
<< " Execution time: " << elapsed << " ms\n" << " ms, throughput: " << to_double_str(throughput)
<< " Frames: " << num_late_frames << "/" << latencies.size() << " (late/all)\n" << " FPS, latency: first: " << to_double_str(first_latency)
<< " Latency:\n" << " ms, min: " << to_double_str(min_latency)
<< " first: " << first_latency << " ms\n" << " ms, avg: " << to_double_str(avg_latency)
<< " min: " << min_latency << " ms\n" << " ms, max: " << to_double_str(max_latency)
<< " max: " << max_latency << " ms\n" << " ms, frames: " << num_late_frames << "/" << seq_ids.back()+1 << " (dropped/all)";
<< " avg: " << std::fixed << std::setprecision(3) << avg_latency << " ms\n"
<< " Throughput: " << std::fixed << std::setprecision(3) << throughput << " FPS";
if (expand) { if (expand) {
for (size_t i = 0; i < latencies.size(); ++i) { for (size_t i = 0; i < latencies.size(); ++i) {
ss << "\nFrame:" << i << "\nLatency: " ss << "\nFrame:" << i << "\nLatency: "
<< latencies[i] << " ms"; << to_double_str(latencies[i]) << " ms";
} }
} }
@ -70,10 +75,12 @@ public:
virtual ~Pipeline() = default; virtual ~Pipeline() = default;
protected: protected:
virtual void _compile() = 0; virtual void _compile() = 0;
virtual int64_t run_iter() = 0; virtual void run_iter() = 0;
virtual void init() {}; virtual void init() {};
virtual void deinit() {}; virtual void deinit() {};
void prepareOutputs();
std::string m_name; std::string m_name;
cv::GComputation m_comp; cv::GComputation m_comp;
@ -82,6 +89,11 @@ protected:
cv::GCompileArgs m_args; cv::GCompileArgs m_args;
size_t m_num_outputs; size_t m_num_outputs;
PerfReport m_perf; PerfReport m_perf;
cv::GRunArgsP m_pipeline_outputs;
std::vector<cv::Mat> m_out_mats;
int64_t m_start_ts;
int64_t m_seq_id;
}; };
Pipeline::Pipeline(std::string&& name, Pipeline::Pipeline(std::string&& name,
@ -101,42 +113,82 @@ Pipeline::Pipeline(std::string&& name,
void Pipeline::compile() { void Pipeline::compile() {
m_perf.warmup_time = m_perf.warmup_time =
utils::measure<std::chrono::milliseconds>([this]() { utils::measure<utils::double_ms_t>([this]() {
_compile(); _compile();
}); });
} }
void Pipeline::prepareOutputs() {
// NB: N-2 buffers + timestamp + seq_id.
m_out_mats.resize(m_num_outputs - 2);
for (auto& m : m_out_mats) {
m_pipeline_outputs += cv::gout(m);
}
m_pipeline_outputs += cv::gout(m_start_ts);
m_pipeline_outputs += cv::gout(m_seq_id);
}
void Pipeline::run() { void Pipeline::run() {
using namespace std::chrono; using namespace std::chrono;
// NB: Allocate outputs for execution
prepareOutputs();
// NB: Warm-up iteration invalidates source state
// so need to copy it
auto orig_src = m_src;
auto copy_src = std::make_shared<DummySource>(*m_src);
// NB: Use copy for warm-up iteration
m_src = copy_src;
// NB: Warm-up iteration
init();
run_iter();
deinit();
// NB: Calculate first latency
m_perf.first_latency = utils::double_ms_t{
microseconds{utils::timestamp<microseconds>() - m_start_ts}}.count();
// NB: Now use original source
m_src = orig_src;
// NB: Start measuring execution
init(); init();
auto start = high_resolution_clock::now(); auto start = high_resolution_clock::now();
m_stop_criterion->start(); m_stop_criterion->start();
while (true) { while (true) {
m_perf.latencies.push_back(run_iter()); run_iter();
m_perf.elapsed = duration_cast<milliseconds>(high_resolution_clock::now() - start).count(); const auto latency = utils::double_ms_t{
microseconds{utils::timestamp<microseconds>() - m_start_ts}}.count();
m_perf.latencies.push_back(latency);
m_perf.seq_ids.push_back(m_seq_id);
m_stop_criterion->iter(); m_stop_criterion->iter();
if (m_stop_criterion->done()) { if (m_stop_criterion->done()) {
m_perf.elapsed = duration_cast<utils::double_ms_t>(
high_resolution_clock::now() - start).count();
deinit(); deinit();
break; break;
} }
} }
m_perf.avg_latency = utils::avg(m_perf.latencies); m_perf.avg_latency = utils::avg(m_perf.latencies);
m_perf.min_latency = utils::min(m_perf.latencies); m_perf.min_latency = utils::min(m_perf.latencies);
m_perf.max_latency = utils::max(m_perf.latencies); m_perf.max_latency = utils::max(m_perf.latencies);
m_perf.first_latency = m_perf.latencies[0];
// NB: Count how many executions don't fit into camera latency interval. // NB: Count the number of dropped frames
m_perf.num_late_frames = int64_t prev_seq_id = m_perf.seq_ids[0];
std::count_if(m_perf.latencies.begin(), m_perf.latencies.end(), for (size_t i = 1; i < m_perf.seq_ids.size(); ++i) {
[this](int64_t latency) { m_perf.num_late_frames += m_perf.seq_ids[i] - prev_seq_id - 1;
return static_cast<double>(latency) > m_src->latency(); prev_seq_id = m_perf.seq_ids[i];
}); }
m_perf.throughput = m_perf.throughput = (m_perf.latencies.size() / m_perf.elapsed) * 1000;
(m_perf.latencies.size() / static_cast<double>(m_perf.elapsed)) * 1000;
} }
const PerfReport& Pipeline::report() const { const PerfReport& Pipeline::report() const {
@ -155,13 +207,6 @@ private:
} }
virtual void init() override { virtual void init() override {
using namespace std::chrono;
// NB: N-1 buffers + timestamp.
m_out_mats.resize(m_num_outputs - 1);
for (auto& m : m_out_mats) {
m_pipeline_outputs += cv::gout(m);
}
m_pipeline_outputs += cv::gout(m_start_ts);
m_compiled.setSource(m_src); m_compiled.setSource(m_src);
m_compiled.start(); m_compiled.start();
} }
@ -170,15 +215,11 @@ private:
m_compiled.stop(); m_compiled.stop();
} }
virtual int64_t run_iter() override { virtual void run_iter() override {
m_compiled.pull(cv::GRunArgsP{m_pipeline_outputs}); m_compiled.pull(cv::GRunArgsP{m_pipeline_outputs});
return utils::timestamp<std::chrono::milliseconds>() - m_start_ts;
} }
cv::GStreamingCompiled m_compiled; cv::GStreamingCompiled m_compiled;
cv::GRunArgsP m_pipeline_outputs;
std::vector<cv::Mat> m_out_mats;
int64_t m_start_ts;
}; };
class RegularPipeline : public Pipeline { class RegularPipeline : public Pipeline {
@ -192,26 +233,13 @@ private:
cv::GCompileArgs(m_args)); cv::GCompileArgs(m_args));
} }
virtual void init() override { virtual void run_iter() override {
m_out_mats.resize(m_num_outputs); cv::gapi::wip::Data data;
for (auto& m : m_out_mats) { m_src->pull(data);
m_pipeline_outputs += cv::gout(m); m_compiled({data}, cv::GRunArgsP{m_pipeline_outputs});
}
}
virtual int64_t run_iter() override {
using namespace std::chrono;
cv::gapi::wip::Data d;
m_src->pull(d);
auto in_mat = cv::util::get<cv::Mat>(d);
return utils::measure<milliseconds>([&]{
m_compiled(cv::gin(in_mat), cv::GRunArgsP{m_pipeline_outputs});
});
} }
cv::GCompiled m_compiled; cv::GCompiled m_compiled;
cv::GRunArgsP m_pipeline_outputs;
std::vector<cv::Mat> m_out_mats;
}; };
enum class PLMode { enum class PLMode {

@ -163,13 +163,10 @@ struct DummyCall {
cv::Mat& out_mat, cv::Mat& out_mat,
DummyState& state) { DummyState& state) {
using namespace std::chrono; using namespace std::chrono;
double total = 0; auto start_ts = utils::timestamp<utils::double_ms_t>();
auto start = high_resolution_clock::now();
state.mat.copyTo(out_mat); state.mat.copyTo(out_mat);
while (total < time) { auto elapsed = utils::timestamp<utils::double_ms_t>() - start_ts;
total = duration_cast<duration<double, std::milli>>( utils::busyWait(duration_cast<microseconds>(utils::double_ms_t{time-elapsed}));
high_resolution_clock::now() - start).count();
}
} }
}; };
@ -656,16 +653,16 @@ Pipeline::Ptr PipelineBuilder::construct() {
} }
GAPI_Assert(m_state->stop_criterion); GAPI_Assert(m_state->stop_criterion);
if (m_state->mode == PLMode::STREAMING) { GAPI_Assert(graph_inputs.size() == 1);
GAPI_Assert(graph_inputs.size() == 1); GAPI_Assert(cv::util::holds_alternative<cv::GMat>(graph_inputs[0]));
GAPI_Assert(cv::util::holds_alternative<cv::GMat>(graph_inputs[0])); // FIXME: Handle GFrame when NV12 comes.
// FIXME: Handle GFrame when NV12 comes. const auto& graph_input = cv::util::get<cv::GMat>(graph_inputs[0]);
const auto& graph_input = cv::util::get<cv::GMat>(graph_inputs[0]); graph_outputs.emplace_back(
// NB: In case streaming mode need to expose timestamp in order to cv::gapi::streaming::timestamp(graph_input).strip());
// calculate performance metrics. graph_outputs.emplace_back(
graph_outputs.emplace_back( cv::gapi::streaming::seq_id(graph_input).strip());
cv::gapi::streaming::timestamp(graph_input).strip());
if (m_state->mode == PLMode::STREAMING) {
return std::make_shared<StreamingPipeline>(std::move(m_state->name), return std::make_shared<StreamingPipeline>(std::move(m_state->name),
cv::GComputation( cv::GComputation(
cv::GProtoInputArgs{graph_inputs}, cv::GProtoInputArgs{graph_inputs},

@ -17,6 +17,8 @@ struct OutputDescr {
namespace utils { namespace utils {
using double_ms_t = std::chrono::duration<double, std::milli>;
inline void createNDMat(cv::Mat& mat, const std::vector<int>& dims, int depth) { inline void createNDMat(cv::Mat& mat, const std::vector<int>& dims, int depth) {
GAPI_Assert(!dims.empty()); GAPI_Assert(!dims.empty());
mat.create(dims, depth); mat.create(dims, depth);
@ -50,10 +52,8 @@ inline void generateRandom(cv::Mat& out) {
} }
} }
inline void sleep(double ms) { inline void sleep(std::chrono::microseconds delay) {
#if defined(_WIN32) #if defined(_WIN32)
// NB: It takes portions of 100 nanoseconds.
int64_t ns_units = static_cast<int64_t>(ms * 1e4);
// FIXME: Wrap it to RAII and instance only once. // FIXME: Wrap it to RAII and instance only once.
HANDLE timer = CreateWaitableTimer(NULL, true, NULL); HANDLE timer = CreateWaitableTimer(NULL, true, NULL);
if (!timer) { if (!timer) {
@ -61,7 +61,12 @@ inline void sleep(double ms) {
} }
LARGE_INTEGER li; LARGE_INTEGER li;
li.QuadPart = -ns_units; using ns_t = std::chrono::nanoseconds;
using ns_100_t = std::chrono::duration<ns_t::rep,
std::ratio_multiply<std::ratio<100>, ns_t::period>>;
// NB: QuadPart takes portions of 100 nanoseconds.
li.QuadPart = -std::chrono::duration_cast<ns_100_t>(delay).count();
if(!SetWaitableTimer(timer, &li, 0, NULL, NULL, false)){ if(!SetWaitableTimer(timer, &li, 0, NULL, NULL, false)){
CloseHandle(timer); CloseHandle(timer);
throw std::logic_error("Failed to set timer"); throw std::logic_error("Failed to set timer");
@ -72,8 +77,7 @@ inline void sleep(double ms) {
} }
CloseHandle(timer); CloseHandle(timer);
#else #else
using namespace std::chrono; std::this_thread::sleep_for(delay);
std::this_thread::sleep_for(duration<double, std::milli>(ms));
#endif #endif
} }
@ -93,6 +97,16 @@ typename duration_t::rep timestamp() {
return duration_cast<duration_t>(now.time_since_epoch()).count(); return duration_cast<duration_t>(now.time_since_epoch()).count();
} }
inline void busyWait(std::chrono::microseconds delay) {
auto start_ts = timestamp<std::chrono::microseconds>();
auto end_ts = start_ts;
auto time_to_wait = delay.count();
while (end_ts - start_ts < time_to_wait) {
end_ts = timestamp<std::chrono::microseconds>();
}
}
template <typename K, typename V> template <typename K, typename V>
void mergeMapWith(std::map<K, V>& target, const std::map<K, V>& second) { void mergeMapWith(std::map<K, V>& target, const std::map<K, V>& second) {
for (auto&& item : second) { for (auto&& item : second) {

Loading…
Cancel
Save