// This file is part of OpenCV project. // It is subject to the license terms in the LICENSE file found in the top-level directory // of this distribution and at http://opencv.org/license.html. // // Copyright (C) 2019 Intel Corporation #include "test_precomp.hpp" #include #include #include #include #include namespace opencv_test { //Main idea behind these tests is to have the same test script that is parameterized in order to test all setups (GCompiled vs apply, callback vs future). //So these differences are factored into devoted helper classes (mixins) which are then used by the common test script by help of CRTP. //Actual GAPI Computation with parameters to run on is mixed into test via CRTP as well. struct SumOfSum2x2 { cv::GComputation sum_of_sum; SumOfSum2x2() : sum_of_sum([]{ cv::GMat in; cv::GScalar out = cv::gapi::sum(in + in); return GComputation{in, out}; }) {} const cv::Size sz{2, 2}; cv::Mat in_mat{sz, CV_8U, cv::Scalar(1)}; cv::Scalar out_sc; cv::GCompiled compile(){ return sum_of_sum.compile(descr_of(in_mat)); } cv::GComputation& computation(){ return sum_of_sum; } cv::GCompileArgs compile_args(){ return {}; } cv::GRunArgs in_args(){ return cv::gin(in_mat); } cv::GRunArgsP out_args(){ return cv::gout(out_sc); } void verify(){ EXPECT_EQ(8, out_sc[0]); } }; namespace { G_TYPED_KERNEL(GThrow, , "org.opencv.test.throw") { static GMatDesc outMeta(GMatDesc in) { return in; } }; struct gthrow_exception : std::runtime_error { using std::runtime_error::runtime_error; }; GAPI_OCV_KERNEL(GThrowImpl, GThrow) { static void run(const cv::Mat& in, cv::Mat&) { //this condition is needed to avoid "Unreachable code" warning on windows inside OCVCallHelper if (!in.empty()) { throw gthrow_exception{"test"}; } } }; //TODO: unify with callback helper code struct cancel_struct { std::atomic num_tasks_to_spawn; cv::gapi::wip::GAsyncContext ctx; cancel_struct(int tasks_to_spawn) : num_tasks_to_spawn(tasks_to_spawn) {} }; G_TYPED_KERNEL(GCancelationAdHoc, , "org.opencv.test.cancel_ad_hoc") { static GMatDesc outMeta(GMatDesc in, cancel_struct* ) { return in; } }; GAPI_OCV_KERNEL(GCancelationAdHocImpl, GCancelationAdHoc) { static void run(const cv::Mat& , cancel_struct* cancel_struct_p, cv::Mat&) { auto& cancel_struct_ = * cancel_struct_p; auto num_tasks_to_spawn = -- cancel_struct_.num_tasks_to_spawn; cancel_struct_.ctx.cancel(); EXPECT_GT(num_tasks_to_spawn, 0)<<"Incorrect Test setup - to small number of tasks to feed the queue \n"; } }; } struct ExceptionOnExecution { cv::GComputation throwing_gcomp; ExceptionOnExecution() : throwing_gcomp([]{ cv::GMat in; auto gout = GThrow::on(in); return GComputation{in, gout}; }) {} const cv::Size sz{2, 2}; cv::Mat in_mat{sz, CV_8U, cv::Scalar(1)}; cv::Mat out; cv::GCompiled compile(){ return throwing_gcomp.compile(descr_of(in_mat), compile_args()); } cv::GComputation& computation(){ return throwing_gcomp; } cv::GRunArgs in_args(){ return cv::gin(in_mat); } cv::GRunArgsP out_args(){ return cv::gout(out); } cv::GCompileArgs compile_args(){ auto pkg = cv::gapi::kernels(); return cv::compile_args(pkg); } }; struct SelfCanceling { cv::GComputation self_cancel; SelfCanceling(cancel_struct* cancel_struct_p) : self_cancel([cancel_struct_p]{ cv::GMat in; cv::GMat out = GCancelationAdHoc::on(in, cancel_struct_p); return GComputation{in, out}; }) {} const cv::Size sz{2, 2}; cv::Mat in_mat{sz, CV_8U, cv::Scalar(1)}; cv::Mat out_mat; cv::GCompiled compile(){ return self_cancel.compile(descr_of(in_mat), compile_args()); } cv::GComputation& computation(){ return self_cancel; } cv::GRunArgs in_args(){ return cv::gin(in_mat); } cv::GRunArgsP out_args(){ return cv::gout(out_mat); } cv::GCompileArgs compile_args(){ auto pkg = cv::gapi::kernels(); return cv::compile_args(pkg); } }; template struct crtp_cast { template static crtp_final_t* crtp_cast_(crtp_base_t* this_) { return static_cast(this_); } }; //Test Mixin, hiding details of callback based notification template struct CallBack: crtp_cast { std::atomic callback_called = {false}; std::mutex mtx; std::exception_ptr ep; std::condition_variable cv; std::function callback(){ return [&](std::exception_ptr ep_){ ep = ep_; callback_called = true; mtx.lock(); mtx.unlock(); cv.notify_one(); }; }; template void start_async(Args&&... args){ this->crtp_cast_(this)->async(callback(), std::forward(args)...); } template void start_async(cv::gapi::wip::GAsyncContext& ctx, Args&&... args){ this->crtp_cast_(this)->async(ctx, callback(), std::forward(args)...); } void wait_for_result() { std::unique_lock lck{mtx}; cv.wait(lck,[&]{return callback_called == true;}); if (ep) { std::rethrow_exception(ep); } } }; //Test Mixin, hiding details of future based notification template struct Future: crtp_cast { std::future f; template void start_async(Args&&... args){ f = this->crtp_cast_(this)->async(std::forward(args)...); } void wait_for_result() { f.get(); } }; //Test Mixin, hiding details of using compiled GAPI object template struct AsyncCompiled : crtp_cast{ template auto async(Args&&... args) -> decltype(cv::gapi::wip::async(std::declval(), std::forward(args)...)){ auto gcmpld = this->crtp_cast_(this)->compile(); return cv::gapi::wip::async(gcmpld, std::forward(args)...); } template auto async(cv::gapi::wip::GAsyncContext& ctx, Args&&... args) -> decltype(cv::gapi::wip::async(std::declval(), std::forward(args)..., std::declval())) { auto gcmpld = this->crtp_cast_(this)->compile(); return cv::gapi::wip::async(gcmpld, std::forward(args)..., ctx); } }; //Test Mixin, hiding details of calling apply (async_apply) on GAPI Computation object template struct AsyncApply : crtp_cast { template auto async(Args&&... args) -> decltype(cv::gapi::wip::async_apply(std::declval(), std::forward(args)..., std::declval())) { return cv::gapi::wip::async_apply( this->crtp_cast_(this)->computation(), std::forward(args)..., this->crtp_cast_(this)->compile_args() ); } template auto async(cv::gapi::wip::GAsyncContext& ctx, Args&&... args) -> decltype(cv::gapi::wip::async_apply(std::declval(), std::forward(args)... , std::declval(), std::declval())) { return cv::gapi::wip::async_apply( this->crtp_cast_(this)->computation(), std::forward(args)..., this->crtp_cast_(this)->compile_args(), ctx ); } }; template struct normal: ::testing::Test, case_t{}; TYPED_TEST_CASE_P(normal); TYPED_TEST_P(normal, basic){ //Normal scenario: start function asynchronously and wait for the result, and verify it this->start_async(this->in_args(), this->out_args()); this->wait_for_result(); this->verify(); } REGISTER_TYPED_TEST_CASE_P(normal, basic ); template struct exception: ::testing::Test, case_t{}; TYPED_TEST_CASE_P(exception); TYPED_TEST_P(exception, basic){ //Exceptional scenario: start function asynchronously and make sure exception is passed to the user this->start_async(this->in_args(), this->out_args()); EXPECT_THROW(this->wait_for_result(), gthrow_exception); } REGISTER_TYPED_TEST_CASE_P(exception, basic ); template struct stress : ::testing::Test{}; TYPED_TEST_CASE_P(stress); TYPED_TEST_P(stress, test){ //Some stress testing: use a number of threads to start a bunch of async requests const std::size_t request_per_thread = 10; const std::size_t number_of_threads = 4; auto thread_body = [&](){ std::vector requests(request_per_thread); for (auto&& r : requests){ r.start_async(r.in_args(), r.out_args()); } for (auto&& r : requests){ r.wait_for_result(); r.verify(); } }; std::vector pool {number_of_threads}; for (auto&& t : pool){ t = std::thread{thread_body}; } for (auto&& t : pool){ t.join(); } } REGISTER_TYPED_TEST_CASE_P(stress, test); template struct cancel : ::testing::Test{}; TYPED_TEST_CASE_P(cancel); TYPED_TEST_P(cancel, basic){ constexpr int num_tasks = 100; cancel_struct cancel_struct_ {num_tasks}; std::vector requests; requests.reserve(num_tasks); for (auto i = num_tasks; i>0; i--){ requests.emplace_back(&cancel_struct_); } for (auto&& r : requests){ //first request will cancel other on it's execution r.start_async(cancel_struct_.ctx, r.in_args(), r.out_args()); } unsigned int canceled = 0 ; for (auto&& r : requests){ try { r.wait_for_result(); }catch (cv::gapi::wip::GAsyncCanceled&){ ++canceled; } } ASSERT_GT(canceled, 0u); } namespace { GRunArgs deep_copy_out_args(const GRunArgsP& args ){ GRunArgs result; result.reserve(args.size()); for (auto&& arg : args){ //FIXME: replace this switch with use of visit() on variant, when it will be available switch (arg.index()){ #if !defined(GAPI_STANDALONE) case GRunArgP::index_of() : result.emplace_back(*util::get(arg)); break; case GRunArgP::index_of() : result.emplace_back(*util::get(arg)); break; #endif // !defined(GAPI_STANDALONE) case GRunArgP::index_of() : result.emplace_back(*util::get (arg)); break; case GRunArgP::index_of() : result.emplace_back(*util::get (arg)); break; case GRunArgP::index_of() : result.emplace_back(util::get (arg)); break; default : ; } } return result; } GRunArgsP args_p_from_args(GRunArgs& args){ GRunArgsP result; result.reserve(args.size()); for (auto&& arg : args){ switch (arg.index()){ #if !defined(GAPI_STANDALONE) case GRunArg::index_of() : result.emplace_back(&util::get(arg)); break; case GRunArg::index_of() : result.emplace_back(&util::get(arg)); break; #endif // !defined(GAPI_STANDALONE) case GRunArg::index_of() : result.emplace_back(&util::get (arg)); break; case GRunArg::index_of() : result.emplace_back(&util::get (arg)); break; case GRunArg::index_of() : result.emplace_back(util::get (arg)); break; default : ; } } return result; } } REGISTER_TYPED_TEST_CASE_P(cancel, basic); template struct output_args_lifetime : ::testing::Test{ static constexpr const int num_of_requests = 20; }; TYPED_TEST_CASE_P(output_args_lifetime); //There are intentionally no actual checks (asserts and verify) in output_args_lifetime tests. //They are more of example use-cases than real tests. (ASAN/valgrind can still catch issues here) TYPED_TEST_P(output_args_lifetime, callback){ std::atomic active_requests = {0}; for (int i=0; inum_of_requests; i++) { TypeParam r; //As output arguments are __captured by reference__ calling code //__must__ ensure they live long enough to complete asynchronous activity. //(i.e. live at least until callback is called) auto out_args_ptr = std::make_shared(deep_copy_out_args(r.out_args())); //Extend lifetime of out_args_ptr content by capturing it into a callback auto cb = [&active_requests, out_args_ptr](std::exception_ptr ){ --active_requests; }; ++active_requests; r.async(cb, r.in_args(), args_p_from_args(*out_args_ptr)); } while(active_requests){ std::this_thread::sleep_for(std::chrono::milliseconds{2}); } } TYPED_TEST_P(output_args_lifetime, future){ std::vector> fs(this->num_of_requests); std::vector> out_ptrs(this->num_of_requests); for (int i=0; inum_of_requests; i++) { TypeParam r; //As output arguments are __captured by reference__ calling code //__must__ ensure they live long enough to complete asynchronous activity. //(i.e. live at least until future.get()/wait() is returned) auto out_args_ptr = std::make_shared(deep_copy_out_args(r.out_args())); //Extend lifetime of out_args_ptr content out_ptrs[i] = out_args_ptr; fs[i] = r.async(r.in_args(), args_p_from_args(*out_args_ptr)); } for (auto const& ftr : fs ){ ftr.wait(); } } REGISTER_TYPED_TEST_CASE_P(output_args_lifetime, callback, future); //little helpers to match up all combinations of setups template class... args_t> struct Case : compute_fixture_t, args_t> ... { template Case(Args&&... args) : compute_fixture_t(std::forward(args)...) { } Case(Case const & ) = default; Case(Case && ) = default; Case() = default; }; template using cases = ::testing::Types< Case, Case, Case, Case >; INSTANTIATE_TYPED_TEST_CASE_P(AsyncAPINormalFlow_, normal, cases); INSTANTIATE_TYPED_TEST_CASE_P(AsyncAPIExceptionHandling_, exception, cases); INSTANTIATE_TYPED_TEST_CASE_P(AsyncAPIStress, stress, cases); INSTANTIATE_TYPED_TEST_CASE_P(AsyncAPICancelation, cancel, cases); template using explicit_wait_cases = ::testing::Types< Case, Case, Case, Case >; INSTANTIATE_TYPED_TEST_CASE_P(AsyncAPIOutArgsLifetTime, output_args_lifetime, explicit_wait_cases); } // namespace opencv_test