diff --git a/modules/gapi/include/opencv2/gapi/gcompiled_async.hpp b/modules/gapi/include/opencv2/gapi/gcompiled_async.hpp index 559f653217..96da0d7c38 100644 --- a/modules/gapi/include/opencv2/gapi/gcompiled_async.hpp +++ b/modules/gapi/include/opencv2/gapi/gcompiled_async.hpp @@ -24,6 +24,11 @@ namespace wip { //These functions asynchronously (i.e. probably on a separate thread of execution) call operator() member function of their first argument with copies of rest of arguments (except callback) passed in. //The difference between the function is the way to get the completion notification (via callback or a waiting on std::future object) //If exception is occurred during execution of apply it is transfered to the callback (via function parameter) or passed to future (and will be thrown on call to std::future::get) + + //N.B. : + //Input arguments are copied on call to async function (actually on call to cv::gin) and thus do not have to outlive the actual completion of asynchronous activity. + //While Output arguments are "captured" by reference(pointer) and therefore _must_ outlive the asynchronous activity + //(i.e. live at least until callback is called or future is unblocked) GAPI_EXPORTS void async(GCompiled& gcmpld, std::function&& callback, GRunArgs &&ins, GRunArgsP &&outs); GAPI_EXPORTS void async(GCompiled& gcmpld, std::function&& callback, GRunArgs &&ins, GRunArgsP &&outs, GAsyncContext& ctx); diff --git a/modules/gapi/include/opencv2/gapi/gcomputation_async.hpp b/modules/gapi/include/opencv2/gapi/gcomputation_async.hpp index 7444abe7eb..661e097bbf 100644 --- a/modules/gapi/include/opencv2/gapi/gcomputation_async.hpp +++ b/modules/gapi/include/opencv2/gapi/gcomputation_async.hpp @@ -25,6 +25,11 @@ namespace wip { //These functions asynchronously (i.e. probably on a separate thread of execution) call apply member function of their first argument with copies of rest of arguments (except callback) passed in. //The difference between the function is the way to get the completion notification (via callback or a waiting on std::future object) //If exception is occurred during execution of apply it is transfered to the callback (via function parameter) or passed to future (and will be thrown on call to std::future::get) + + //N.B. : + //Input arguments are copied on call to async function (actually on call to cv::gin) and thus do not have to outlive the actual completion of asynchronous activity. + //While Output arguments are "captured" by reference(pointer) and therefore _must_ outlive the asynchronous activity + //(i.e. live at least until callback is called or future is unblocked) GAPI_EXPORTS void async_apply(GComputation& gcomp, std::function&& callback, GRunArgs &&ins, GRunArgsP &&outs, GCompileArgs &&args = {}); GAPI_EXPORTS void async_apply(GComputation& gcomp, std::function&& callback, GRunArgs &&ins, GRunArgsP &&outs, GCompileArgs &&args, GAsyncContext& ctx); diff --git a/modules/gapi/test/gapi_async_test.cpp b/modules/gapi/test/gapi_async_test.cpp index d6fc5935dc..9702119b52 100644 --- a/modules/gapi/test/gapi_async_test.cpp +++ b/modules/gapi/test/gapi_async_test.cpp @@ -380,14 +380,115 @@ TYPED_TEST_P(cancel, basic){ ASSERT_GT(canceled, 0u); } +namespace { + GRunArgs deep_copy_out_args(const GRunArgsP& args ){ + GRunArgs result; result.reserve(args.size()); + for (auto&& arg : args){ + //FIXME: replace this switch with use of visit() on variant, when it will be available + switch (arg.index()){ + #if !defined(GAPI_STANDALONE) + case GRunArgP::index_of() : result.emplace_back(*util::get(arg)); break; + case GRunArgP::index_of() : result.emplace_back(*util::get(arg)); break; + case GRunArgP::index_of() : result.emplace_back(*util::get(arg)); break; + #endif // !defined(GAPI_STANDALONE) + case GRunArgP::index_of() : result.emplace_back(*util::get (arg)); break; + case GRunArgP::index_of() : result.emplace_back(*util::get(arg)); break; + case GRunArgP::index_of() : result.emplace_back(util::get (arg)); break; + default : ; + } + } + return result; + } + + GRunArgsP args_p_from_args(GRunArgs& args){ + GRunArgsP result; result.reserve(args.size()); + for (auto&& arg : args){ + switch (arg.index()){ + #if !defined(GAPI_STANDALONE) + case GRunArg::index_of() : result.emplace_back(&util::get(arg)); break; + case GRunArg::index_of() : result.emplace_back(&util::get(arg)); break; + case GRunArg::index_of() : result.emplace_back(&util::get(arg)); break; + #endif // !defined(GAPI_STANDALONE) + case GRunArg::index_of() : result.emplace_back(&util::get (arg)); break; + case GRunArg::index_of() : result.emplace_back(&util::get(arg)); break; + case GRunArg::index_of() : result.emplace_back(util::get (arg)); break; + default : ; + } + } + return result; + } +} + REGISTER_TYPED_TEST_CASE_P(cancel, basic); +template +struct output_args_lifetime : ::testing::Test{ + static constexpr const int num_of_requests = 20; +}; +TYPED_TEST_CASE_P(output_args_lifetime); +//There are intentionaly no actual checks (asserts and verify) in output_args_lifetime tests. +//They are more of example use-cases than real tests. (ASAN/valgrind can still catch issues here) +TYPED_TEST_P(output_args_lifetime, callback){ + + std::atomic active_requests = {0}; + + for (int i=0; inum_of_requests; i++) + { + TypeParam r; + + //As output arguments are __captured by reference__ calling code + //__must__ ensure they live long enough to complete asynchronous activity. + //(i.e. live at least until callback is called) + auto out_args_ptr = std::make_shared(deep_copy_out_args(r.out_args())); + + //Extend lifetime of out_args_ptr content by capturing it into a callback + auto cb = [&active_requests, out_args_ptr](std::exception_ptr ){ + --active_requests; + }; + + ++active_requests; + + r.async(cb, r.in_args(), args_p_from_args(*out_args_ptr)); + } + + + while(active_requests){ + std::this_thread::sleep_for(std::chrono::milliseconds{2}); + } +} + + +TYPED_TEST_P(output_args_lifetime, future){ + + std::vector> fs(this->num_of_requests); + std::vector> out_ptrs(this->num_of_requests); + + for (int i=0; inum_of_requests; i++) + { + TypeParam r; + + //As output arguments are __captured by reference__ calling code + //__must__ ensure they live long enough to complete asynchronous activity. + //(i.e. live at least until future.get()/wait() is returned) + auto out_args_ptr = std::make_shared(deep_copy_out_args(r.out_args())); + + //Extend lifetime of out_args_ptr content + out_ptrs[i] = out_args_ptr; + + fs[i] = r.async(r.in_args(), args_p_from_args(*out_args_ptr)); + } + + for (auto const& ftr : fs ){ + ftr.wait(); + } +} +REGISTER_TYPED_TEST_CASE_P(output_args_lifetime, callback, future); + //little helpers to match up all combinations of setups -template class callback_or_future_t, template class compiled_or_apply_t> +template class... args_t> struct Case : compute_fixture_t, - callback_or_future_t>, - compiled_or_apply_t > + args_t> ... { template Case(Args&&... args) : compute_fixture_t(std::forward(args)...) { } @@ -404,6 +505,7 @@ using cases = ::testing::Types< Case, Case >; + INSTANTIATE_TYPED_TEST_CASE_P(AsyncAPINormalFlow_, normal, cases); INSTANTIATE_TYPED_TEST_CASE_P(AsyncAPIExceptionHandling_, exception, cases); @@ -411,19 +513,14 @@ INSTANTIATE_TYPED_TEST_CASE_P(AsyncAPIStress, stress, cases); -TEST(AsyncAPI, Sample){ - cv::GComputation self_mul([]{ - cv::GMat in; - cv::GMat out = cv::gapi::mul(in, in); - return GComputation{in, out}; - }); - - const cv::Size sz{2, 2}; - cv::Mat in_mat{sz, CV_8U, cv::Scalar(1)}; - cv::Mat out; +template +using explicit_wait_cases = ::testing::Types< + Case, + Case, + Case, + Case + >; - auto f = cv::gapi::wip::async_apply(self_mul,cv::gin(in_mat), cv::gout(out)); - f.wait(); -} +INSTANTIATE_TYPED_TEST_CASE_P(AsyncAPIOutArgsLifetTime, output_args_lifetime, explicit_wait_cases); } // namespace opencv_test