diff --git a/modules/gapi/samples/pipeline_modeling_tool.cpp b/modules/gapi/samples/pipeline_modeling_tool.cpp index 7c202642a9..3a300d7dd2 100644 --- a/modules/gapi/samples/pipeline_modeling_tool.cpp +++ b/modules/gapi/samples/pipeline_modeling_tool.cpp @@ -35,6 +35,22 @@ static AppMode strToAppMode(const std::string& mode_str) { } } +enum class WaitMode { + BUSY, + SLEEP +}; + +static WaitMode strToWaitMode(const std::string& mode_str) { + if (mode_str == "sleep") { + return WaitMode::SLEEP; + } else if (mode_str == "busy") { + return WaitMode::BUSY; + } else { + throw std::logic_error("Unsupported wait mode: " + mode_str + + "\nPlease chose between: busy (default) and sleep"); + } +} + template T read(const cv::FileNode& node) { return static_cast(node); @@ -401,7 +417,12 @@ int main(int argc, char* argv[]) { if (app_mode == AppMode::BENCHMARK) { latency = 0.0; } - auto src = std::make_shared(latency, output, drop_frames); + + const auto wait_mode = + strToWaitMode(readOpt(src_fn["wait_mode"]).value_or("busy")); + auto wait_strategy = (wait_mode == WaitMode::SLEEP) ? utils::sleep : utils::busyWait; + auto src = std::make_shared( + utils::double_ms_t{latency}, output, drop_frames, std::move(wait_strategy)); builder.setSource(src_name, src); } @@ -446,7 +467,7 @@ int main(int argc, char* argv[]) { // NB: Pipeline mode from config takes priority over cmd. auto pl_mode = cfg_pl_mode.has_value() ? strToPLMode(cfg_pl_mode.value()) : cmd_pl_mode; - // NB: Using drop_frames with streaming pipelines will follow to + // NB: Using drop_frames with streaming pipelines will lead to // incorrect performance results. if (drop_frames && pl_mode == PLMode::STREAMING) { throw std::logic_error( diff --git a/modules/gapi/samples/pipeline_modeling_tool/dummy_source.hpp b/modules/gapi/samples/pipeline_modeling_tool/dummy_source.hpp index 4c2ea1638c..f0cad0338a 100644 --- a/modules/gapi/samples/pipeline_modeling_tool/dummy_source.hpp +++ b/modules/gapi/samples/pipeline_modeling_tool/dummy_source.hpp @@ -12,26 +12,36 @@ class DummySource final: public cv::gapi::wip::IStreamSource { public: + using WaitStrategy = std::function; using Ptr = std::shared_ptr; - DummySource(const double latency, + using ts_t = std::chrono::microseconds; + + template + DummySource(const DurationT latency, const OutputDescr& output, - const bool drop_frames); + const bool drop_frames, + WaitStrategy&& wait); + bool pull(cv::gapi::wip::Data& data) override; cv::GMetaArg descr_of() const override; - double latency() const { return m_latency; }; private: - double m_latency; - cv::Mat m_mat; - bool m_drop_frames; - double m_next_tick_ts = -1; - int64_t m_curr_seq_id = 0; + int64_t m_latency; + cv::Mat m_mat; + bool m_drop_frames; + int64_t m_next_tick_ts = -1; + int64_t m_curr_seq_id = 0; + WaitStrategy m_wait; }; -DummySource::DummySource(const double latency, +template +DummySource::DummySource(const DurationT latency, const OutputDescr& output, - const bool drop_frames) - : m_latency(latency), m_drop_frames(drop_frames) { + const bool drop_frames, + WaitStrategy&& wait) + : m_latency(std::chrono::duration_cast(latency).count()), + m_drop_frames(drop_frames), + m_wait(std::move(wait)) { utils::createNDMat(m_mat, output.dims, output.precision); utils::generateRandom(m_mat); } @@ -42,10 +52,10 @@ bool DummySource::pull(cv::gapi::wip::Data& data) { // NB: Wait m_latency before return the first frame. if (m_next_tick_ts == -1) { - m_next_tick_ts = utils::timestamp() + m_latency; + m_next_tick_ts = utils::timestamp() + m_latency; } - int64_t curr_ts = utils::timestamp(); + int64_t curr_ts = utils::timestamp(); if (curr_ts < m_next_tick_ts) { /* * curr_ts @@ -57,8 +67,8 @@ bool DummySource::pull(cv::gapi::wip::Data& data) { * * NB: New frame will be produced at the m_next_tick_ts point. */ - utils::sleep(m_next_tick_ts - curr_ts); - } else { + m_wait(ts_t{m_next_tick_ts - curr_ts}); + } else if (m_latency != 0) { /* * curr_ts * +1 +2 | @@ -66,29 +76,28 @@ bool DummySource::pull(cv::gapi::wip::Data& data) { * ^ ^ * m_next_tick_ts -------------> * - * - * NB: Shift m_next_tick_ts to the nearest tick before curr_ts and - * update current seq_id correspondingly. - * - * if drop_frames is enabled, wait for the next tick, otherwise - * return last written frame (+2 at the picture above) immediately. */ + + // NB: Count how many frames have been produced since last pull (m_next_tick_ts). int64_t num_frames = static_cast((curr_ts - m_next_tick_ts) / m_latency); - m_curr_seq_id += num_frames; + // NB: Shift m_next_tick_ts to the nearest tick before curr_ts. m_next_tick_ts += num_frames * m_latency; + // NB: if drop_frames is enabled, update current seq_id and wait for the next tick, otherwise + // return last written frame (+2 at the picture above) immediately. if (m_drop_frames) { + // NB: Shift tick to the next frame. m_next_tick_ts += m_latency; - ++m_curr_seq_id; - utils::sleep(m_next_tick_ts - curr_ts); + // NB: Wait for the next frame. + m_wait(ts_t{m_next_tick_ts - curr_ts}); + // NB: Drop already produced frames + update seq_id for the current. + m_curr_seq_id += num_frames + 1; } } - // NB: Just increase reference counter not to release mat memory // after assigning it to the data. cv::Mat mat = m_mat; - - data.meta[meta_tag::timestamp] = utils::timestamp(); + data.meta[meta_tag::timestamp] = utils::timestamp(); data.meta[meta_tag::seq_id] = m_curr_seq_id++; data = mat; m_next_tick_ts += m_latency; diff --git a/modules/gapi/samples/pipeline_modeling_tool/pipeline.hpp b/modules/gapi/samples/pipeline_modeling_tool/pipeline.hpp index ac192cba52..5220a0d1ad 100644 --- a/modules/gapi/samples/pipeline_modeling_tool/pipeline.hpp +++ b/modules/gapi/samples/pipeline_modeling_tool/pipeline.hpp @@ -6,34 +6,39 @@ struct PerfReport { std::string name; double avg_latency = 0.0; - int64_t min_latency = 0; - int64_t max_latency = 0; - int64_t first_latency = 0; + double min_latency = 0.0; + double max_latency = 0.0; + double first_latency = 0.0; double throughput = 0.0; - int64_t elapsed = 0; - int64_t warmup_time = 0; + double elapsed = 0.0; + double warmup_time = 0.0; int64_t num_late_frames = 0; - std::vector latencies; + std::vector latencies; + std::vector seq_ids; std::string toStr(bool expanded = false) const; }; std::string PerfReport::toStr(bool expand) const { + const auto to_double_str = [](double val) { + std::stringstream ss; + ss << std::fixed << std::setprecision(3) << val; + return ss.str(); + }; + std::stringstream ss; - ss << name << ": \n" - << " Warm up time: " << warmup_time << " ms\n" - << " Execution time: " << elapsed << " ms\n" - << " Frames: " << num_late_frames << "/" << latencies.size() << " (late/all)\n" - << " Latency:\n" - << " first: " << first_latency << " ms\n" - << " min: " << min_latency << " ms\n" - << " max: " << max_latency << " ms\n" - << " avg: " << std::fixed << std::setprecision(3) << avg_latency << " ms\n" - << " Throughput: " << std::fixed << std::setprecision(3) << throughput << " FPS"; + ss << name << ": warm-up: " << to_double_str(warmup_time) + << " ms, execution time: " << to_double_str(elapsed) + << " ms, throughput: " << to_double_str(throughput) + << " FPS, latency: first: " << to_double_str(first_latency) + << " ms, min: " << to_double_str(min_latency) + << " ms, avg: " << to_double_str(avg_latency) + << " ms, max: " << to_double_str(max_latency) + << " ms, frames: " << num_late_frames << "/" << seq_ids.back()+1 << " (dropped/all)"; if (expand) { for (size_t i = 0; i < latencies.size(); ++i) { ss << "\nFrame:" << i << "\nLatency: " - << latencies[i] << " ms"; + << to_double_str(latencies[i]) << " ms"; } } @@ -70,10 +75,12 @@ public: virtual ~Pipeline() = default; protected: - virtual void _compile() = 0; - virtual int64_t run_iter() = 0; - virtual void init() {}; - virtual void deinit() {}; + virtual void _compile() = 0; + virtual void run_iter() = 0; + virtual void init() {}; + virtual void deinit() {}; + + void prepareOutputs(); std::string m_name; cv::GComputation m_comp; @@ -82,6 +89,11 @@ protected: cv::GCompileArgs m_args; size_t m_num_outputs; PerfReport m_perf; + + cv::GRunArgsP m_pipeline_outputs; + std::vector m_out_mats; + int64_t m_start_ts; + int64_t m_seq_id; }; Pipeline::Pipeline(std::string&& name, @@ -101,42 +113,82 @@ Pipeline::Pipeline(std::string&& name, void Pipeline::compile() { m_perf.warmup_time = - utils::measure([this]() { + utils::measure([this]() { _compile(); }); } +void Pipeline::prepareOutputs() { + // NB: N-2 buffers + timestamp + seq_id. + m_out_mats.resize(m_num_outputs - 2); + for (auto& m : m_out_mats) { + m_pipeline_outputs += cv::gout(m); + } + m_pipeline_outputs += cv::gout(m_start_ts); + m_pipeline_outputs += cv::gout(m_seq_id); +} + void Pipeline::run() { using namespace std::chrono; + // NB: Allocate outputs for execution + prepareOutputs(); + + // NB: Warm-up iteration invalidates source state + // so need to copy it + auto orig_src = m_src; + auto copy_src = std::make_shared(*m_src); + + // NB: Use copy for warm-up iteration + m_src = copy_src; + + // NB: Warm-up iteration + init(); + run_iter(); + deinit(); + + // NB: Calculate first latency + m_perf.first_latency = utils::double_ms_t{ + microseconds{utils::timestamp() - m_start_ts}}.count(); + + // NB: Now use original source + m_src = orig_src; + + // NB: Start measuring execution init(); auto start = high_resolution_clock::now(); m_stop_criterion->start(); + while (true) { - m_perf.latencies.push_back(run_iter()); - m_perf.elapsed = duration_cast(high_resolution_clock::now() - start).count(); + run_iter(); + const auto latency = utils::double_ms_t{ + microseconds{utils::timestamp() - m_start_ts}}.count(); + + m_perf.latencies.push_back(latency); + m_perf.seq_ids.push_back(m_seq_id); + m_stop_criterion->iter(); if (m_stop_criterion->done()) { + m_perf.elapsed = duration_cast( + high_resolution_clock::now() - start).count(); deinit(); break; } } - m_perf.avg_latency = utils::avg(m_perf.latencies); - m_perf.min_latency = utils::min(m_perf.latencies); - m_perf.max_latency = utils::max(m_perf.latencies); - m_perf.first_latency = m_perf.latencies[0]; + m_perf.avg_latency = utils::avg(m_perf.latencies); + m_perf.min_latency = utils::min(m_perf.latencies); + m_perf.max_latency = utils::max(m_perf.latencies); - // NB: Count how many executions don't fit into camera latency interval. - m_perf.num_late_frames = - std::count_if(m_perf.latencies.begin(), m_perf.latencies.end(), - [this](int64_t latency) { - return static_cast(latency) > m_src->latency(); - }); + // NB: Count the number of dropped frames + int64_t prev_seq_id = m_perf.seq_ids[0]; + for (size_t i = 1; i < m_perf.seq_ids.size(); ++i) { + m_perf.num_late_frames += m_perf.seq_ids[i] - prev_seq_id - 1; + prev_seq_id = m_perf.seq_ids[i]; + } - m_perf.throughput = - (m_perf.latencies.size() / static_cast(m_perf.elapsed)) * 1000; + m_perf.throughput = (m_perf.latencies.size() / m_perf.elapsed) * 1000; } const PerfReport& Pipeline::report() const { @@ -155,13 +207,6 @@ private: } virtual void init() override { - using namespace std::chrono; - // NB: N-1 buffers + timestamp. - m_out_mats.resize(m_num_outputs - 1); - for (auto& m : m_out_mats) { - m_pipeline_outputs += cv::gout(m); - } - m_pipeline_outputs += cv::gout(m_start_ts); m_compiled.setSource(m_src); m_compiled.start(); } @@ -170,15 +215,11 @@ private: m_compiled.stop(); } - virtual int64_t run_iter() override { + virtual void run_iter() override { m_compiled.pull(cv::GRunArgsP{m_pipeline_outputs}); - return utils::timestamp() - m_start_ts; } cv::GStreamingCompiled m_compiled; - cv::GRunArgsP m_pipeline_outputs; - std::vector m_out_mats; - int64_t m_start_ts; }; class RegularPipeline : public Pipeline { @@ -192,26 +233,13 @@ private: cv::GCompileArgs(m_args)); } - virtual void init() override { - m_out_mats.resize(m_num_outputs); - for (auto& m : m_out_mats) { - m_pipeline_outputs += cv::gout(m); - } - } - - virtual int64_t run_iter() override { - using namespace std::chrono; - cv::gapi::wip::Data d; - m_src->pull(d); - auto in_mat = cv::util::get(d); - return utils::measure([&]{ - m_compiled(cv::gin(in_mat), cv::GRunArgsP{m_pipeline_outputs}); - }); + virtual void run_iter() override { + cv::gapi::wip::Data data; + m_src->pull(data); + m_compiled({data}, cv::GRunArgsP{m_pipeline_outputs}); } - cv::GCompiled m_compiled; - cv::GRunArgsP m_pipeline_outputs; - std::vector m_out_mats; + cv::GCompiled m_compiled; }; enum class PLMode { diff --git a/modules/gapi/samples/pipeline_modeling_tool/pipeline_builder.hpp b/modules/gapi/samples/pipeline_modeling_tool/pipeline_builder.hpp index 6ac6374f07..3964b68a86 100644 --- a/modules/gapi/samples/pipeline_modeling_tool/pipeline_builder.hpp +++ b/modules/gapi/samples/pipeline_modeling_tool/pipeline_builder.hpp @@ -163,13 +163,10 @@ struct DummyCall { cv::Mat& out_mat, DummyState& state) { using namespace std::chrono; - double total = 0; - auto start = high_resolution_clock::now(); + auto start_ts = utils::timestamp(); state.mat.copyTo(out_mat); - while (total < time) { - total = duration_cast>( - high_resolution_clock::now() - start).count(); - } + auto elapsed = utils::timestamp() - start_ts; + utils::busyWait(duration_cast(utils::double_ms_t{time-elapsed})); } }; @@ -656,16 +653,16 @@ Pipeline::Ptr PipelineBuilder::construct() { } GAPI_Assert(m_state->stop_criterion); - if (m_state->mode == PLMode::STREAMING) { - GAPI_Assert(graph_inputs.size() == 1); - GAPI_Assert(cv::util::holds_alternative(graph_inputs[0])); - // FIXME: Handle GFrame when NV12 comes. - const auto& graph_input = cv::util::get(graph_inputs[0]); - // NB: In case streaming mode need to expose timestamp in order to - // calculate performance metrics. - graph_outputs.emplace_back( - cv::gapi::streaming::timestamp(graph_input).strip()); + GAPI_Assert(graph_inputs.size() == 1); + GAPI_Assert(cv::util::holds_alternative(graph_inputs[0])); + // FIXME: Handle GFrame when NV12 comes. + const auto& graph_input = cv::util::get(graph_inputs[0]); + graph_outputs.emplace_back( + cv::gapi::streaming::timestamp(graph_input).strip()); + graph_outputs.emplace_back( + cv::gapi::streaming::seq_id(graph_input).strip()); + if (m_state->mode == PLMode::STREAMING) { return std::make_shared(std::move(m_state->name), cv::GComputation( cv::GProtoInputArgs{graph_inputs}, diff --git a/modules/gapi/samples/pipeline_modeling_tool/utils.hpp b/modules/gapi/samples/pipeline_modeling_tool/utils.hpp index c0f0897c35..0297aed53a 100644 --- a/modules/gapi/samples/pipeline_modeling_tool/utils.hpp +++ b/modules/gapi/samples/pipeline_modeling_tool/utils.hpp @@ -17,6 +17,8 @@ struct OutputDescr { namespace utils { +using double_ms_t = std::chrono::duration; + inline void createNDMat(cv::Mat& mat, const std::vector& dims, int depth) { GAPI_Assert(!dims.empty()); mat.create(dims, depth); @@ -50,10 +52,8 @@ inline void generateRandom(cv::Mat& out) { } } -inline void sleep(double ms) { +inline void sleep(std::chrono::microseconds delay) { #if defined(_WIN32) - // NB: It takes portions of 100 nanoseconds. - int64_t ns_units = static_cast(ms * 1e4); // FIXME: Wrap it to RAII and instance only once. HANDLE timer = CreateWaitableTimer(NULL, true, NULL); if (!timer) { @@ -61,7 +61,12 @@ inline void sleep(double ms) { } LARGE_INTEGER li; - li.QuadPart = -ns_units; + using ns_t = std::chrono::nanoseconds; + using ns_100_t = std::chrono::duration, ns_t::period>>; + // NB: QuadPart takes portions of 100 nanoseconds. + li.QuadPart = -std::chrono::duration_cast(delay).count(); + if(!SetWaitableTimer(timer, &li, 0, NULL, NULL, false)){ CloseHandle(timer); throw std::logic_error("Failed to set timer"); @@ -72,8 +77,7 @@ inline void sleep(double ms) { } CloseHandle(timer); #else - using namespace std::chrono; - std::this_thread::sleep_for(duration(ms)); + std::this_thread::sleep_for(delay); #endif } @@ -93,6 +97,16 @@ typename duration_t::rep timestamp() { return duration_cast(now.time_since_epoch()).count(); } +inline void busyWait(std::chrono::microseconds delay) { + auto start_ts = timestamp(); + auto end_ts = start_ts; + auto time_to_wait = delay.count(); + + while (end_ts - start_ts < time_to_wait) { + end_ts = timestamp(); + } +} + template void mergeMapWith(std::map& target, const std::map& second) { for (auto&& item : second) {