diff --git a/modules/gapi/cmake/standalone.cmake b/modules/gapi/cmake/standalone.cmake index 9dd5540643..e8dbcaae45 100644 --- a/modules/gapi/cmake/standalone.cmake +++ b/modules/gapi/cmake/standalone.cmake @@ -37,6 +37,7 @@ set_property(TARGET ${FLUID_TARGET} PROPERTY CXX_STANDARD 11) if(MSVC) target_compile_options(${FLUID_TARGET} PUBLIC "/wd4251") + target_compile_options(${FLUID_TARGET} PUBLIC "/wd4275") target_compile_definitions(${FLUID_TARGET} PRIVATE _CRT_SECURE_NO_DEPRECATE) endif() diff --git a/modules/gapi/include/opencv2/gapi/gasync_context.hpp b/modules/gapi/include/opencv2/gapi/gasync_context.hpp new file mode 100644 index 0000000000..3e01577bb5 --- /dev/null +++ b/modules/gapi/include/opencv2/gapi/gasync_context.hpp @@ -0,0 +1,38 @@ +// This file is part of OpenCV project. +// It is subject to the license terms in the LICENSE file found in the top-level directory +// of this distribution and at http://opencv.org/license.html. +// +// Copyright (C) 2019 Intel Corporation + +#ifndef OPENCV_GAPI_GASYNC_CONTEXT_HPP +#define OPENCV_GAPI_GASYNC_CONTEXT_HPP + +#if !defined(GAPI_STANDALONE) +# include +#else // Without OpenCV +# include +#endif // !defined(GAPI_STANDALONE) + +#include + +namespace cv { +namespace gapi{ +namespace wip { + +class GAPI_EXPORTS GAsyncContext{ + std::atomic cancelation_requested = {false}; +public: + //returns true if it was a first request to cancel the context + bool cancel(); + bool isCanceled() const; +}; + +class GAPI_EXPORTS GAsyncCanceled : public std::exception { +public: + virtual const char* what() const noexcept CV_OVERRIDE; +}; +} // namespace wip +} // namespace gapi +} // namespace cv + +#endif //OPENCV_GAPI_GASYNC_CONTEXT_HPP diff --git a/modules/gapi/include/opencv2/gapi/gcompiled_async.hpp b/modules/gapi/include/opencv2/gapi/gcompiled_async.hpp index 924380d979..559f653217 100644 --- a/modules/gapi/include/opencv2/gapi/gcompiled_async.hpp +++ b/modules/gapi/include/opencv2/gapi/gcompiled_async.hpp @@ -12,6 +12,7 @@ #include //for std::exception_ptr #include //for std::function #include +#include namespace cv { //fwd declaration @@ -19,13 +20,17 @@ namespace cv { namespace gapi{ namespace wip { + class GAsyncContext; //These functions asynchronously (i.e. probably on a separate thread of execution) call operator() member function of their first argument with copies of rest of arguments (except callback) passed in. //The difference between the function is the way to get the completion notification (via callback or a waiting on std::future object) //If exception is occurred during execution of apply it is transfered to the callback (via function parameter) or passed to future (and will be thrown on call to std::future::get) GAPI_EXPORTS void async(GCompiled& gcmpld, std::function&& callback, GRunArgs &&ins, GRunArgsP &&outs); + GAPI_EXPORTS void async(GCompiled& gcmpld, std::function&& callback, GRunArgs &&ins, GRunArgsP &&outs, GAsyncContext& ctx); + GAPI_EXPORTS std::future async(GCompiled& gcmpld, GRunArgs &&ins, GRunArgsP &&outs); -} // namespace gapi + GAPI_EXPORTS std::future async(GCompiled& gcmpld, GRunArgs &&ins, GRunArgsP &&outs, GAsyncContext& ctx); } // namespace wip +} // namespace gapi } // namespace cv #endif // OPENCV_GAPI_GCOMPILED_ASYNC_HPP diff --git a/modules/gapi/include/opencv2/gapi/gcomputation_async.hpp b/modules/gapi/include/opencv2/gapi/gcomputation_async.hpp index 1f76dd0aa8..7444abe7eb 100644 --- a/modules/gapi/include/opencv2/gapi/gcomputation_async.hpp +++ b/modules/gapi/include/opencv2/gapi/gcomputation_async.hpp @@ -8,24 +8,30 @@ #define OPENCV_GAPI_GCOMPUTATION_ASYNC_HPP -#include +#include //for std::future #include //for std::exception_ptr #include //for std::function #include //for GRunArgs, GRunArgsP #include //for GCompileArgs +#include + namespace cv { //fwd declaration class GComputation; namespace gapi { namespace wip { + class GAsyncContext; //These functions asynchronously (i.e. probably on a separate thread of execution) call apply member function of their first argument with copies of rest of arguments (except callback) passed in. //The difference between the function is the way to get the completion notification (via callback or a waiting on std::future object) //If exception is occurred during execution of apply it is transfered to the callback (via function parameter) or passed to future (and will be thrown on call to std::future::get) GAPI_EXPORTS void async_apply(GComputation& gcomp, std::function&& callback, GRunArgs &&ins, GRunArgsP &&outs, GCompileArgs &&args = {}); + GAPI_EXPORTS void async_apply(GComputation& gcomp, std::function&& callback, GRunArgs &&ins, GRunArgsP &&outs, GCompileArgs &&args, GAsyncContext& ctx); + GAPI_EXPORTS std::future async_apply(GComputation& gcomp, GRunArgs &&ins, GRunArgsP &&outs, GCompileArgs &&args = {}); -} // nmaepspace gapi + GAPI_EXPORTS std::future async_apply(GComputation& gcomp, GRunArgs &&ins, GRunArgsP &&outs, GCompileArgs &&args, GAsyncContext& ctx); } // namespace wip +} // namespace gapi } // namespace cv diff --git a/modules/gapi/include/opencv2/gapi/own/cvdefs.hpp b/modules/gapi/include/opencv2/gapi/own/cvdefs.hpp index e110536929..71c2aa8e2d 100644 --- a/modules/gapi/include/opencv2/gapi/own/cvdefs.hpp +++ b/modules/gapi/include/opencv2/gapi/own/cvdefs.hpp @@ -108,6 +108,10 @@ typedef unsigned short ushort; #define CV_ELEM_SIZE(type) \ (CV_MAT_CN(type) << ((((sizeof(size_t)/4+1)*16384|0x3a50) >> CV_MAT_DEPTH(type)*2) & 3)) +#ifndef CV_OVERRIDE +# define CV_OVERRIDE override +#endif + // base.h: namespace cv { diff --git a/modules/gapi/src/executor/gasync.cpp b/modules/gapi/src/executor/gasync.cpp index a66563b74f..b92dbdcec4 100644 --- a/modules/gapi/src/executor/gasync.cpp +++ b/modules/gapi/src/executor/gasync.cpp @@ -4,10 +4,12 @@ // // Copyright (C) 2019 Intel Corporation + #include #include #include #include +#include #include @@ -19,11 +21,11 @@ namespace { //This is a tool to move initialize captures of a lambda in C++11 template - struct move_through_copy{ + struct copy_through_move{ T value; - move_through_copy(T&& g) : value(std::move(g)) {} - move_through_copy(move_through_copy&&) = default; - move_through_copy(move_through_copy const& lhs) : move_through_copy(std::move(const_cast(lhs))) {} + copy_through_move(T&& g) : value(std::move(g)) {} + copy_through_move(copy_through_move&&) = default; + copy_through_move(copy_through_move const& lhs) : copy_through_move(std::move(const_cast(lhs))) {} }; } @@ -80,6 +82,7 @@ public: }}; } } + std::unique_lock lck{mtx}; bool first_task = q.empty(); q.push(std::move(t)); @@ -108,8 +111,12 @@ async_service the_ctx; } namespace { -template -std::exception_ptr call_and_catch(f_t&& f){ +template +std::exception_ptr call_and_catch(f_t&& f, context_t&& ctx){ + if (std::forward(ctx).isCanceled()){ + return std::make_exception_ptr(GAsyncCanceled{}); + } + std::exception_ptr eptr; try { std::forward(f)(); @@ -120,15 +127,21 @@ std::exception_ptr call_and_catch(f_t&& f){ return eptr; } -template -void call_with_callback(f_t&& f, callback_t&& cb){ - auto eptr = call_and_catch(std::forward(f)); +struct DummyContext { + bool isCanceled() const { + return false; + } +}; + +template +void call_with_callback(f_t&& f, callback_t&& cb, context_t&& ctx){ + auto eptr = call_and_catch(std::forward(f), std::forward(ctx)); std::forward(cb)(eptr); } -template -void call_with_futute(f_t&& f, std::promise& p){ - auto eptr = call_and_catch(std::forward(f)); +template +void call_with_future(f_t&& f, std::promise& p, context_t&& ctx){ + auto eptr = call_and_catch(std::forward(f), std::forward(ctx)); if (eptr){ p.set_exception(eptr); } @@ -138,56 +151,126 @@ void call_with_futute(f_t&& f, std::promise& p){ } }//namespace +bool GAsyncContext::cancel(){ + bool expected = false; + bool updated = cancelation_requested.compare_exchange_strong(expected, true); + return updated; +} + +bool GAsyncContext::isCanceled() const { + return cancelation_requested.load(); +} + +const char* GAsyncCanceled::what() const noexcept { + return "GAPI asynchronous operation was canceled"; +} + //For now these async functions are simply wrapping serial version of apply/operator() into a functor. //These functors are then serialized into single queue, which is processed by a devoted background thread. void async_apply(GComputation& gcomp, std::function&& callback, GRunArgs &&ins, GRunArgsP &&outs, GCompileArgs &&args){ - //TODO: use move_through_copy for all args except gcomp + //TODO: use copy_through_move for all args except gcomp + //TODO: avoid code duplication between versions of "async" functions auto l = [=]() mutable { auto apply_l = [&](){ gcomp.apply(std::move(ins), std::move(outs), std::move(args)); }; - call_with_callback(apply_l,std::move(callback)); + call_with_callback(apply_l,std::move(callback), DummyContext{}); }; impl::the_ctx.add_task(l); } std::future async_apply(GComputation& gcomp, GRunArgs &&ins, GRunArgsP &&outs, GCompileArgs &&args){ - move_through_copy> prms{{}}; + copy_through_move> prms{{}}; auto f = prms.value.get_future(); auto l = [=]() mutable { auto apply_l = [&](){ gcomp.apply(std::move(ins), std::move(outs), std::move(args)); }; - call_with_futute(apply_l, prms.value); + call_with_future(apply_l, prms.value, DummyContext{}); }; impl::the_ctx.add_task(l); return f; } +void async_apply(GComputation& gcomp, std::function&& callback, GRunArgs &&ins, GRunArgsP &&outs, GCompileArgs &&args, GAsyncContext& ctx){ + //TODO: use copy_through_move for all args except gcomp + auto l = [=, &ctx]() mutable { + auto apply_l = [&](){ + gcomp.apply(std::move(ins), std::move(outs), std::move(args)); + }; + + call_with_callback(apply_l,std::move(callback), ctx); + }; + impl::the_ctx.add_task(l); +} + +std::future async_apply(GComputation& gcomp, GRunArgs &&ins, GRunArgsP &&outs, GCompileArgs &&args, GAsyncContext& ctx){ + copy_through_move> prms{{}}; + auto f = prms.value.get_future(); + auto l = [=, &ctx]() mutable { + auto apply_l = [&](){ + gcomp.apply(std::move(ins), std::move(outs), std::move(args)); + }; + + call_with_future(apply_l, prms.value, ctx); + }; + + impl::the_ctx.add_task(l); + return f; + +} + void async(GCompiled& gcmpld, std::function&& callback, GRunArgs &&ins, GRunArgsP &&outs){ auto l = [=]() mutable { auto apply_l = [&](){ gcmpld(std::move(ins), std::move(outs)); }; - call_with_callback(apply_l,std::move(callback)); + call_with_callback(apply_l,std::move(callback), DummyContext{}); + }; + + impl::the_ctx.add_task(l); +} + +void async(GCompiled& gcmpld, std::function&& callback, GRunArgs &&ins, GRunArgsP &&outs, GAsyncContext& ctx){ + auto l = [=, &ctx]() mutable { + auto apply_l = [&](){ + gcmpld(std::move(ins), std::move(outs)); + }; + + call_with_callback(apply_l,std::move(callback), ctx); }; impl::the_ctx.add_task(l); } std::future async(GCompiled& gcmpld, GRunArgs &&ins, GRunArgsP &&outs){ - move_through_copy> prms{{}}; + copy_through_move> prms{{}}; auto f = prms.value.get_future(); auto l = [=]() mutable { auto apply_l = [&](){ gcmpld(std::move(ins), std::move(outs)); }; - call_with_futute(apply_l, prms.value); + call_with_future(apply_l, prms.value, DummyContext{}); + }; + + impl::the_ctx.add_task(l); + return f; + +} +std::future async(GCompiled& gcmpld, GRunArgs &&ins, GRunArgsP &&outs, GAsyncContext& ctx){ + copy_through_move> prms{{}}; + auto f = prms.value.get_future(); + auto l = [=, &ctx]() mutable { + auto apply_l = [&](){ + gcmpld(std::move(ins), std::move(outs)); + }; + + call_with_future(apply_l, prms.value, ctx); }; impl::the_ctx.add_task(l); diff --git a/modules/gapi/test/gapi_async_test.cpp b/modules/gapi/test/gapi_async_test.cpp index 4ad4c597d0..d6fc5935dc 100644 --- a/modules/gapi/test/gapi_async_test.cpp +++ b/modules/gapi/test/gapi_async_test.cpp @@ -8,6 +8,8 @@ #include "test_precomp.hpp" #include #include +#include + #include #include @@ -78,6 +80,32 @@ namespace { } } }; + + + //TODO: unify with callback helper code + struct cancel_struct { + std::atomic num_tasks_to_spawn; + + cv::gapi::wip::GAsyncContext ctx; + + cancel_struct(int tasks_to_spawn) : num_tasks_to_spawn(tasks_to_spawn) {} + }; + + G_TYPED_KERNEL(GCancelationAdHoc, , "org.opencv.test.cancel_ad_hoc") + { + static GMatDesc outMeta(GMatDesc in, cancel_struct* ) { return in; } + + }; + + GAPI_OCV_KERNEL(GCancelationAdHocImpl, GCancelationAdHoc) + { + static void run(const cv::Mat& , cancel_struct* cancel_struct_p, cv::Mat&) { + auto& cancel_struct_ = * cancel_struct_p; + auto num_tasks_to_spawn = -- cancel_struct_.num_tasks_to_spawn; + cancel_struct_.ctx.cancel(); + EXPECT_GT(num_tasks_to_spawn, 0)<<"Incorrect Test setup - to small number of tasks to feed the queue \n"; + } + }; } struct ExceptionOnExecution { @@ -117,6 +145,41 @@ struct ExceptionOnExecution { }; +struct SelfCanceling { + cv::GComputation self_cancel; + SelfCanceling(cancel_struct* cancel_struct_p) : self_cancel([cancel_struct_p]{ + cv::GMat in; + cv::GMat out = GCancelationAdHoc::on(in, cancel_struct_p); + return GComputation{in, out}; + }) + {} + + const cv::Size sz{2, 2}; + cv::Mat in_mat{sz, CV_8U, cv::Scalar(1)}; + cv::Mat out_mat; + + cv::GCompiled compile(){ + return self_cancel.compile(descr_of(in_mat), compile_args()); + } + + cv::GComputation& computation(){ + return self_cancel; + } + + cv::GRunArgs in_args(){ + return cv::gin(in_mat); + } + + cv::GRunArgsP out_args(){ + return cv::gout(out_mat); + } + + cv::GCompileArgs compile_args(){ + auto pkg = cv::gapi::kernels(); + return cv::compile_args(pkg); + } +}; + template struct crtp_cast { template @@ -150,6 +213,11 @@ struct CallBack: crtp_cast { this->crtp_cast_(this)->async(callback(), std::forward(args)...); } + template + void start_async(cv::gapi::wip::GAsyncContext& ctx, Args&&... args){ + this->crtp_cast_(this)->async(ctx, callback(), std::forward(args)...); + } + void wait_for_result() { std::unique_lock lck{mtx}; @@ -186,6 +254,14 @@ struct AsyncCompiled : crtp_cast{ auto gcmpld = this->crtp_cast_(this)->compile(); return cv::gapi::wip::async(gcmpld, std::forward(args)...); } + + template + auto async(cv::gapi::wip::GAsyncContext& ctx, Args&&... args) -> + decltype(cv::gapi::wip::async(std::declval(), std::forward(args)..., std::declval())) + { + auto gcmpld = this->crtp_cast_(this)->compile(); + return cv::gapi::wip::async(gcmpld, std::forward(args)..., ctx); + } }; //Test Mixin, hiding details of calling apply (async_apply) on GAPI Computation object @@ -193,9 +269,23 @@ template struct AsyncApply : crtp_cast { template - auto async(Args&&... args) ->decltype(cv::gapi::wip::async_apply(std::declval(), std::forward(args)...)) { - return cv::gapi::wip::async_apply(this->crtp_cast_(this)->computation(), std::forward(args)..., this->crtp_cast_(this)->compile_args()); + auto async(Args&&... args) -> + decltype(cv::gapi::wip::async_apply(std::declval(), std::forward(args)..., std::declval())) + { + return cv::gapi::wip::async_apply( + this->crtp_cast_(this)->computation(), std::forward(args)..., this->crtp_cast_(this)->compile_args() + ); } + + template + auto async(cv::gapi::wip::GAsyncContext& ctx, Args&&... args) -> + decltype(cv::gapi::wip::async_apply(std::declval(), std::forward(args)... , std::declval(), std::declval())) + { + return cv::gapi::wip::async_apply( + this->crtp_cast_(this)->computation(), std::forward(args)..., this->crtp_cast_(this)->compile_args(), ctx + ); + } + }; @@ -240,7 +330,7 @@ TYPED_TEST_P(stress, test){ const std::size_t number_of_threads = 4; auto thread_body = [&](){ - std::vector requests{request_per_thread}; + std::vector requests(request_per_thread); for (auto&& r : requests){ r.start_async(r.in_args(), r.out_args()); } @@ -262,13 +352,50 @@ TYPED_TEST_P(stress, test){ } REGISTER_TYPED_TEST_CASE_P(stress, test); +template +struct cancel : ::testing::Test{}; +TYPED_TEST_CASE_P(cancel); + +TYPED_TEST_P(cancel, basic){ + constexpr int num_tasks = 100; + cancel_struct cancel_struct_ {num_tasks}; + std::vector requests; requests.reserve(num_tasks); + + for (auto i = num_tasks; i>0; i--){ + requests.emplace_back(&cancel_struct_); + } + for (auto&& r : requests){ + //first request will cancel other on it's execution + r.start_async(cancel_struct_.ctx, r.in_args(), r.out_args()); + } + + unsigned int canceled = 0 ; + for (auto&& r : requests){ + try { + r.wait_for_result(); + }catch (cv::gapi::wip::GAsyncCanceled&){ + ++canceled; + } + } + ASSERT_GT(canceled, 0u); +} + +REGISTER_TYPED_TEST_CASE_P(cancel, basic); + //little helpers to match up all combinations of setups template class callback_or_future_t, template class compiled_or_apply_t> struct Case : compute_fixture_t, callback_or_future_t>, compiled_or_apply_t > -{}; +{ + template + Case(Args&&... args) : compute_fixture_t(std::forward(args)...) { } + Case(Case const & ) = default; + Case(Case && ) = default; + + Case() = default; +}; template using cases = ::testing::Types< @@ -282,6 +409,8 @@ INSTANTIATE_TYPED_TEST_CASE_P(AsyncAPIExceptionHandling_, exception, cases); +INSTANTIATE_TYPED_TEST_CASE_P(AsyncAPICancelation, cancel, cases); + TEST(AsyncAPI, Sample){ cv::GComputation self_mul([]{ cv::GMat in; @@ -296,4 +425,5 @@ TEST(AsyncAPI, Sample){ auto f = cv::gapi::wip::async_apply(self_mul,cv::gin(in_mat), cv::gout(out)); f.wait(); } + } // namespace opencv_test