Merge pull request #21731 from AsyaPronina:asyadev/fix_new_stream_event

Fixed handling of new stream, especially for stateful OCV kernels

* Fixed handling of new stream, especially for stateful OCV kernels

* Removed duplication from StateInitOnce tests

* Addressed review comments for PR #21731
- Fixed explanation comments
- Expanded test for stateful OCV kernels in Regular mode

* Addressed review comments for PR #21731
- Moved notification about new stream to the constructor
- Added test on state reset for Regular mode

* Addresed review comments

* Addressed review comments

Co-authored-by: Ruslan Garnov <ruslan.garnov@intel.com>
pull/21737/head
Anastasiya(Asya) Pronina 3 years ago committed by GitHub
parent 4754b0e253
commit 91a5e75151
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 32
      modules/gapi/src/backends/cpu/gcpubackend.cpp
  2. 4
      modules/gapi/src/backends/cpu/gcpubackend.hpp
  3. 18
      modules/gapi/src/executor/gexecutor.cpp
  4. 14
      modules/gapi/src/executor/gstreamingexecutor.cpp
  5. 184
      modules/gapi/test/cpu/gapi_ocv_stateful_kernel_tests.cpp

@ -27,6 +27,7 @@
#include "api/gbackend_priv.hpp" // FIXME: Make it part of Backend SDK!
#include "utils/itt.hpp"
#include "logger.hpp"
// FIXME: Is there a way to take a typed graph (our GModel),
// and create a new typed graph _ATOP_ of that (by extending with a couple of
@ -113,8 +114,6 @@ cv::gimpl::GCPUExecutable::GCPUExecutable(const ade::Graph &g,
}
}
makeReshape();
// For each stateful kernel call 'setup' user callback to initialize state.
setupKernelStates();
}
// FIXME: Document what it does
@ -190,18 +189,23 @@ void cv::gimpl::GCPUExecutable::makeReshape() {
void cv::gimpl::GCPUExecutable::reshape(ade::Graph&, const GCompileArgs& args) {
m_compileArgs = args;
makeReshape();
// Signal to reset stateful kernels` state.
// There can be no handleNewStream() call to set this flag
// if user didn't call GCompiled`s prepareForNewStream()
m_newStreamStarted = true;
// TODO: Add an input meta sensitivity flag to stateful kernels.
// When reshape() happens, reset state for meta-sensitive kernels only
if (!m_nodesToStates.empty()) {
std::call_once(m_warnFlag,
[](){
GAPI_LOG_WARNING(NULL,
"\nGCPUExecutable::reshape was called. Resetting states of stateful kernels.");
});
setupKernelStates();
}
}
void cv::gimpl::GCPUExecutable::handleNewStream()
{
// Signal to reset stateful kernels` state.
// No need to call reshape() here since it'll
// be called automatically if input meta was changed
m_newStreamStarted = true;
// In case if new video-stream happens - for each stateful kernel
// call 'setup' user callback to re-initialize state.
setupKernelStates();
}
void cv::gimpl::GCPUExecutable::run(std::vector<InObj> &&input_objs,
@ -231,14 +235,6 @@ void cv::gimpl::GCPUExecutable::run(std::vector<InObj> &&input_objs,
}
}
// In case if new video-stream happens - for each stateful kernel
// call 'setup' user callback to re-initialize state.
if (m_newStreamStarted)
{
setupKernelStates();
m_newStreamStarted = false;
}
// OpenCV backend execution is not a rocket science at all.
// Simply invoke our kernels in the proper order.
GConstGCPUModel gcm(m_g);

@ -56,8 +56,8 @@ class GCPUExecutable final: public GIslandExecutable
// Actual data of all resources in graph (both internal and external)
Mag m_res;
// Flag which identifies if new stream was started
bool m_newStreamStarted = false;
// A flag for call_once() (used for log warnings)
std::once_flag m_warnFlag;
GArg packArg(const GArg &arg);
void setupKernelStates();

@ -30,10 +30,11 @@ cv::gimpl::GExecutor::GExecutor(std::unique_ptr<ade::Graph> &&g_model)
// 1. Allocate all internal resources first (NB - CPU plugin doesn't do it)
// 2. Put input/output GComputation arguments to the storage
// 3. For every Island, prepare vectors of input/output parameter descs
// 4. Iterate over a list of operations (sorted in the topological order)
// 5. For every operation, form a list of input/output data objects
// 6. Run GIslandExecutable
// 7. writeBack
// 4. Ask every GIslandExecutable to prepare its internal states for a new stream
// 5. Iterate over a list of operations (sorted in the topological order)
// 6. For every operation, form a list of input/output data objects
// 7. Run GIslandExecutable
// 8. writeBack
auto sorted = m_gim.metadata().get<ade::passes::TopologicalSortData>();
for (auto nh : sorted.nodes())
@ -82,6 +83,9 @@ cv::gimpl::GExecutor::GExecutor(std::unique_ptr<ade::Graph> &&g_model)
break;
} // switch(kind)
} // for(gim nodes)
// (4)
prepareForNewStream();
}
namespace cv {
@ -401,10 +405,10 @@ void cv::gimpl::GExecutor::run(cv::gimpl::GRuntimeArgs &&args)
magazine::resetInternalData(m_res, data);
}
// Run the script
// Run the script (5)
for (auto &op : m_ops)
{
// (5), (6)
// (6), (7)
Input i{m_res, op.in_objects};
Output o{m_res, op.out_objects};
op.isl_exec->run(i, o);
@ -412,7 +416,7 @@ void cv::gimpl::GExecutor::run(cv::gimpl::GRuntimeArgs &&args)
o.verify();
}
// (7)
// (8)
for (auto it : ade::util::zip(ade::util::toRange(proto.outputs),
ade::util::toRange(args.outObjs)))
{

@ -1564,7 +1564,7 @@ void cv::gimpl::GStreamingExecutor::setSource(GRunArgs &&ins)
}
}
};
bool islandsRecompiled = false;
const auto new_meta = cv::descr_of(ins); // 0
if (gm.metadata().contains<OriginalInputMeta>()) // (1)
{
@ -1586,8 +1586,6 @@ void cv::gimpl::GStreamingExecutor::setSource(GRunArgs &&ins)
}
update_int_metas(); // (7)
m_reshapable = util::make_optional(is_reshapable);
islandsRecompiled = true;
}
else // (8)
{
@ -1709,14 +1707,8 @@ void cv::gimpl::GStreamingExecutor::setSource(GRunArgs &&ins)
island_meta_info = GIslandModel::traceIslandName(op.nh, m_gim);
#endif // OPENCV_WITH_ITT
// If Island Executable is recompiled, all its stuff including internal kernel states
// are recreated and re-initialized automatically.
// But if not, we should notify Island Executable about new started stream to let it update
// its internal variables.
if (!islandsRecompiled)
{
op.isl_exec->handleNewStream();
}
// Notify island executable about a new stream to let it update its internal variables.
op.isl_exec->handleNewStream();
m_threads.emplace_back(islandActorThread,
op.in_objects,

@ -14,6 +14,7 @@
#include <opencv2/video.hpp>
#endif
#include <memory> // required by std::shared_ptr
namespace opencv_test
{
@ -21,6 +22,11 @@ namespace opencv_test
{
std::string method;
};
struct CountStateSetupsParams
{
std::shared_ptr<int> pSetupsCount;
};
} // namespace opencv_test
namespace cv
@ -34,6 +40,14 @@ namespace cv
return "org.opencv.test.background_substractor_state_params";
}
};
template<> struct CompileArgTag<opencv_test::CountStateSetupsParams>
{
static const char* tag()
{
return "org.opencv.test.count_state_setups_params";
}
};
} // namespace detail
} // namespace cv
@ -127,8 +141,101 @@ namespace
}
};
#endif
G_TYPED_KERNEL(GCountStateSetups, <cv::GOpaque<bool>(GMat)>,
"org.opencv.test.count_state_setups")
{
static GOpaqueDesc outMeta(GMatDesc /* in */) { return empty_gopaque_desc(); }
};
GAPI_OCV_KERNEL_ST(GOCVCountStateSetups, GCountStateSetups, int)
{
static void setup(const cv::GMatDesc &, std::shared_ptr<int> &,
const cv::GCompileArgs &compileArgs)
{
auto params = cv::gapi::getCompileArg<CountStateSetupsParams>(compileArgs)
.value_or(CountStateSetupsParams { });
if (params.pSetupsCount != nullptr) {
(*params.pSetupsCount)++;
}
}
static void run(const cv::Mat & , bool &out, int &)
{
out = true;
}
};
};
TEST(StatefulKernel, StateInitOnceInRegularMode)
{
cv::GMat in;
cv::GOpaque<bool> out = GCountStateSetups::on(in);
cv::GComputation c(cv::GIn(in), cv::GOut(out));
// Input mat:
cv::Mat inputData(1080, 1920, CV_8UC1);
cv::randu(inputData, cv::Scalar::all(1), cv::Scalar::all(128));
// variable to update when state is initialized in the kernel
CountStateSetupsParams params;
params.pSetupsCount.reset(new int(0));
// Testing for 100 frames
bool result { };
for (int i = 0; i < 100; ++i) {
c.apply(cv::gin(inputData), cv::gout(result),
cv::compile_args(cv::gapi::kernels<GOCVCountStateSetups>(), params));
EXPECT_TRUE(result);
EXPECT_TRUE(params.pSetupsCount != nullptr);
EXPECT_EQ(1, *params.pSetupsCount);
}
};
struct StateInitOnce : public ::testing::TestWithParam<bool>{};
TEST_P(StateInitOnce, StreamingCompiledWithMeta)
{
bool compileWithMeta = GetParam();
cv::GMat in;
cv::GOpaque<bool> out = GCountStateSetups::on(in);
cv::GComputation c(cv::GIn(in), cv::GOut(out));
// Input mat:
cv::Mat inputData(1080, 1920, CV_8UC1);
cv::randu(inputData, cv::Scalar::all(1), cv::Scalar::all(128));
// variable to update when state is initialized in the kernel
CountStateSetupsParams params;
params.pSetupsCount.reset(new int(0));
// Compilation & testing
auto ccomp = (compileWithMeta)
? c.compileStreaming(cv::descr_of(inputData),
cv::compile_args(cv::gapi::kernels<GOCVCountStateSetups>(),
params))
: c.compileStreaming(
cv::compile_args(cv::gapi::kernels<GOCVCountStateSetups>(),
params));
ccomp.setSource(cv::gin(inputData));
ccomp.start();
EXPECT_TRUE(ccomp.running());
int counter { };
bool result;
// Process mat 100 times
while (ccomp.pull(cv::gout(result)) && (counter++ < 100)) {
EXPECT_TRUE(params.pSetupsCount != nullptr);
EXPECT_EQ(1, *params.pSetupsCount);
}
ccomp.stop();
EXPECT_FALSE(ccomp.running());
}
INSTANTIATE_TEST_CASE_P(StatefulKernel, StateInitOnce, ::testing::Bool());
TEST(StatefulKernel, StateIsMutableInRuntime)
{
constexpr int expectedCallsCount = 10;
@ -163,7 +270,43 @@ TEST(StatefulKernel, StateIsMutableInRuntime)
}
TEST(StatefulKernel, StateIsAutoResetForNewStream)
TEST(StateIsResetOnNewStream, RegularMode)
{
cv::GMat in;
cv::GOpaque<bool> out = GCountStateSetups::on(in);
cv::GComputation c(cv::GIn(in), cv::GOut(out));
// Input mat:
cv::Mat inputData(1080, 1920, CV_8UC1);
cv::randu(inputData, cv::Scalar::all(1), cv::Scalar::all(128));
// variable to update when state is initialized in the kernel
CountStateSetupsParams params;
params.pSetupsCount.reset(new int(0));
auto setupsCounter = c.compile(cv::descr_of(inputData),
cv::compile_args(cv::gapi::kernels<GOCVCountStateSetups>(),
params));
bool result { };
for (int i = 0; i < 2; ++i) {
setupsCounter(cv::gin(inputData), cv::gout(result));
EXPECT_TRUE(params.pSetupsCount != nullptr);
EXPECT_EQ(1, *params.pSetupsCount);
}
EXPECT_TRUE(params.pSetupsCount != nullptr);
EXPECT_EQ(1, *params.pSetupsCount);
setupsCounter.prepareForNewStream();
for (int i = 0; i < 2; ++i) {
setupsCounter(cv::gin(inputData), cv::gout(result));
EXPECT_TRUE(params.pSetupsCount != nullptr);
EXPECT_EQ(2, *params.pSetupsCount);
}
}
TEST(StateIsResetOnNewStream, StreamingMode)
{
cv::GMat in;
cv::GOpaque<bool> out = GIsStateUpToDate::on(in);
@ -387,6 +530,45 @@ TEST(StatefulKernel, StateIsChangedViaCompArgsOnReshape)
run("cv/video/768x576.avi", "knn");
run("cv/video/1920x1080.avi", "mog2");
}
TEST(StatefulKernel, StateIsResetOnceOnReshapeInStreaming)
{
cv::GMat in;
cv::GOpaque<bool> out = GCountStateSetups::on(in);
cv::GComputation c(cv::GIn(in), cv::GOut(out));
// variable to update when state is initialized in the kernel
CountStateSetupsParams params;
params.pSetupsCount.reset(new int(0));
auto ccomp = c.compileStreaming(
cv::compile_args(cv::gapi::kernels<GOCVCountStateSetups>(), params));
auto run = [&ccomp, &params](const std::string& videoPath, int expectedSetupsCount) {
auto path = findDataFile(videoPath);
try {
ccomp.setSource<cv::gapi::wip::GCaptureSource>(path);
} catch(...) {
throw SkipTestException("Video file can not be opened");
}
ccomp.start();
int frames = 0;
bool result = false;
while (ccomp.pull(cv::gout(result)) && (frames++ < 10)) {
EXPECT_TRUE(result);
EXPECT_TRUE(params.pSetupsCount != nullptr);
EXPECT_EQ(expectedSetupsCount, *params.pSetupsCount);
}
ccomp.stop();
};
run("cv/video/768x576.avi", 1);
// FIXME: it should be 2, not 3 for expectedSetupsCount here.
// With current implemention both GCPUExecutable reshape() and
// handleNewStream() call setupKernelStates()
run("cv/video/1920x1080.avi", 3);
}
#endif
TEST(StatefulKernel, StateIsAutoResetOnReshape)

Loading…
Cancel
Save