diff --git a/modules/core/src/parallel.cpp b/modules/core/src/parallel.cpp index e0594dbba5..872a69ca0b 100644 --- a/modules/core/src/parallel.cpp +++ b/modules/core/src/parallel.cpp @@ -42,128 +42,196 @@ #include "precomp.hpp" -#if !defined HAVE_TBB && !defined HAVE_OPENMP && !defined HAVE_GCD && !defined HAVE_CONCURRENCY && !defined HAVE_CSTRIPES - #ifdef __APPLE__ - #define HAVE_GCD - #elif defined _MSC_VER && _MSC_VER >= 1600 - #define HAVE_CONCURRENCY - #endif +#ifdef _OPENMP + #define HAVE_OPENMP #endif -#ifdef HAVE_CONCURRENCY - #include <ppl.h> -#elif defined HAVE_OPENMP - #include <omp.h> -#elif defined HAVE_GCD - #include <dispatch/dispatch.h> -#elif defined HAVE_TBB +#ifdef __APPLE__ + #define HAVE_GCD +#endif + +#if defined _MSC_VER && _MSC_VER >= 1600 + #define HAVE_CONCURRENCY +#endif + +/* IMPORTANT: always use the same order of defines + 1. HAVE_TBB - 3rdparty library, should be explicitly enabled + 2. HAVE_CSTRIPES - 3rdparty library, should be explicitly enabled + 3. HAVE_OPENMP - integrated to compiler, should be explicitly enabled + 4. HAVE_GCD - system wide, used automatically (APPLE only) + 5. HAVE_CONCURRENCY - part of runtime, used automatically (Windows only - MSVS 10, MSVS 11) +*/ + +#if defined HAVE_TBB #include "tbb/tbb_stddef.h" #if TBB_VERSION_MAJOR*100 + TBB_VERSION_MINOR >= 202 #include "tbb/tbb.h" #include "tbb/task.h" + #if TBB_INTERFACE_VERSION >= 6100 + #include "tbb/task_arena.h" + #endif #undef min #undef max #else #undef HAVE_TBB #endif // end TBB version -#elif defined HAVE_CSTRIPES - #include "C=.h" - #undef shared #endif -/* - HAVE_TBB - using TBB - HAVE_GCD - using GCD - HAVE_OPENMP - using OpenMP - HAVE_CONCURRENCY - using visual studio 2010 concurrency -*/ +#ifndef HAVE_TBB + #if defined HAVE_CSTRIPES + #include "C=.h" + #undef shared + #elif defined HAVE_OPENMP + #include <omp.h> + #elif defined HAVE_GCD + #include <dispatch/dispatch.h> + #include <pthread.h> + #elif defined HAVE_CONCURRENCY + #include <ppl.h> + #endif +#endif + +#if defined HAVE_TBB || defined HAVE_CSTRIPES || defined HAVE_OPENMP || defined HAVE_GCD || defined HAVE_CONCURRENCY + #define HAVE_PARALLEL_FRAMEWORK +#endif namespace cv { + ParallelLoopBody::~ParallelLoopBody() {} +} + +namespace +{ +#ifdef HAVE_PARALLEL_FRAMEWORK class ParallelLoopBodyWrapper { public: - ParallelLoopBodyWrapper(const ParallelLoopBody& _body, const Range& _r, double _nstripes) + ParallelLoopBodyWrapper(const cv::ParallelLoopBody& _body, const cv::Range& _r, double _nstripes) { body = &_body; wholeRange = _r; double len = wholeRange.end - wholeRange.start; - nstripes = cvRound(_nstripes < 0 ? len : MIN(MAX(_nstripes, 1.), len)); + nstripes = cvRound(_nstripes <= 0 ? len : MIN(MAX(_nstripes, 1.), len)); } - void operator()(const Range& sr) const + void operator()(const cv::Range& sr) const { - Range r; + cv::Range r; r.start = (int)(wholeRange.start + ((size_t)sr.start*(wholeRange.end - wholeRange.start) + nstripes/2)/nstripes); r.end = sr.end >= nstripes ? wholeRange.end : (int)(wholeRange.start + ((size_t)sr.end*(wholeRange.end - wholeRange.start) + nstripes/2)/nstripes); (*body)(r); } - Range stripeRange() const { return Range(0, nstripes); } + cv::Range stripeRange() const { return cv::Range(0, nstripes); } protected: - const ParallelLoopBody* body; - Range wholeRange; + const cv::ParallelLoopBody* body; + cv::Range wholeRange; int nstripes; }; - ParallelLoopBody::~ParallelLoopBody() {} - #if defined HAVE_TBB class ProxyLoopBody : public ParallelLoopBodyWrapper { public: - ProxyLoopBody(const ParallelLoopBody& _body, const Range& _r, double _nstripes) + ProxyLoopBody(const cv::ParallelLoopBody& _body, const cv::Range& _r, double _nstripes) : ParallelLoopBodyWrapper(_body, _r, _nstripes) {} void operator ()(const tbb::blocked_range<int>& range) const { - this->ParallelLoopBodyWrapper::operator()(Range(range.begin(), range.end())); + this->ParallelLoopBodyWrapper::operator()(cv::Range(range.begin(), range.end())); } }; +#elif defined HAVE_CSTRIPES || defined HAVE_OPENMP + typedef ParallelLoopBodyWrapper ProxyLoopBody; #elif defined HAVE_GCD - typedef ParallelLoopBodyWrapper ProxyLoopBody; - static - void block_function(void* context, size_t index) + static void block_function(void* context, size_t index) { ProxyLoopBody* ptr_body = static_cast<ProxyLoopBody*>(context); - (*ptr_body)(Range(index, index + 1)); + (*ptr_body)(cv::Range(index, index + 1)); } #elif defined HAVE_CONCURRENCY class ProxyLoopBody : public ParallelLoopBodyWrapper { public: - ProxyLoopBody(const ParallelLoopBody& _body, const Range& _r, double _nstripes) + ProxyLoopBody(const cv::ParallelLoopBody& _body, const cv::Range& _r, double _nstripes) : ParallelLoopBodyWrapper(_body, _r, _nstripes) {} void operator ()(int i) const { - this->ParallelLoopBodyWrapper::operator()(Range(i, i + 1)); + this->ParallelLoopBodyWrapper::operator()(cv::Range(i, i + 1)); } }; #else typedef ParallelLoopBodyWrapper ProxyLoopBody; #endif - void parallel_for_(const Range& range, const ParallelLoopBody& body, double nstripes) +static int numThreads = -1; + +#if defined HAVE_TBB +static tbb::task_scheduler_init tbbScheduler(tbb::task_scheduler_init::deferred); +#elif defined HAVE_CSTRIPES +// nothing for C= +#elif defined HAVE_OPENMP +static int numThreadsMax = omp_get_max_threads(); +#elif defined HAVE_GCD +// nothing for GCD +#elif defined HAVE_CONCURRENCY +class SchedPtr +{ + Concurrency::Scheduler* sched_; +public: + Concurrency::Scheduler* operator->() { return sched_; } + operator Concurrency::Scheduler*() { return sched_; } + + void operator=(Concurrency::Scheduler* sched) + { + if (sched_) sched_->Release(); + sched_ = sched; + } + + SchedPtr() : sched_(0) {} + ~SchedPtr() { *this = 0; } +}; +static SchedPtr pplScheduler; +#endif + +#endif // HAVE_PARALLEL_FRAMEWORK + +} //namespace + +/* ================================ parallel_for_ ================================ */ + +void cv::parallel_for_(const cv::Range& range, const cv::ParallelLoopBody& body, double nstripes) +{ +#ifdef HAVE_PARALLEL_FRAMEWORK + + if(numThreads != 0) { ProxyLoopBody pbody(body, range, nstripes); - Range stripeRange = pbody.stripeRange(); + cv::Range stripeRange = pbody.stripeRange(); #if defined HAVE_TBB tbb::parallel_for(tbb::blocked_range<int>(stripeRange.start, stripeRange.end), pbody); -#elif defined HAVE_CONCURRENCY +#elif defined HAVE_CSTRIPES - Concurrency::parallel_for(stripeRange.start, stripeRange.end, pbody); + parallel(MAX(0, numThreads)) + { + int offset = stripeRange.start; + int len = stripeRange.end - offset; + Range r(offset + CPX_RANGE_START(len), offset + CPX_RANGE_END(len)); + pbody(r); + barrier(); + } #elif defined HAVE_OPENMP -#pragma omp parallel for schedule(dynamic) + #pragma omp parallel for schedule(dynamic) for (int i = stripeRange.start; i < stripeRange.end; ++i) pbody(Range(i, i + 1)); @@ -172,69 +240,140 @@ namespace cv dispatch_queue_t concurrent_queue = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0); dispatch_apply_f(stripeRange.end - stripeRange.start, concurrent_queue, &pbody, block_function); -#elif defined HAVE_CSTRIPES +#elif defined HAVE_CONCURRENCY - parallel() + if(!pplScheduler || pplScheduler->Id() == Concurrency::CurrentScheduler::Id()) { - int offset = stripeRange.start; - int len = stripeRange.end - offset; - Range r(offset + CPX_RANGE_START(len), offset + CPX_RANGE_END(len)); - pbody(r); - barrier(); + Concurrency::parallel_for(stripeRange.start, stripeRange.end, pbody); + } + else + { + pplScheduler->Attach(); + Concurrency::parallel_for(stripeRange.start, stripeRange.end, pbody); + Concurrency::CurrentScheduler::Detach(); } #else - pbody(stripeRange); +#error You have hacked and compiling with unsupported parallel framework #endif - } - -} // namespace cv + } + else -static int numThreads = 0; -static int numProcs = 0; +#endif // HAVE_PARALLEL_FRAMEWORK + { + (void)nstripes; + body(range); + } +} int cv::getNumThreads(void) { - if( !numProcs ) - setNumThreads(0); - return numThreads; -} +#ifdef HAVE_PARALLEL_FRAMEWORK + + if(numThreads == 0) + return 1; -void cv::setNumThreads( int -#ifdef _OPENMP - threads #endif - ) -{ - if( !numProcs ) - { -#ifdef _OPENMP - numProcs = omp_get_num_procs(); + +#if defined HAVE_TBB + + return tbbScheduler.is_active() + ? numThreads + : tbb::task_scheduler_init::default_num_threads(); + +#elif defined HAVE_CSTRIPES + + return cv::getNumberOfCPUs(); + +#elif defined HAVE_OPENMP + + return omp_get_max_threads(); + +#elif defined HAVE_GCD + + return 512; // the GCD thread pool limit + +#elif defined HAVE_CONCURRENCY + + return 1 + (pplScheduler == 0 + ? Concurrency::CurrentScheduler::Get()->GetNumberOfVirtualProcessors() + : pplScheduler->GetNumberOfVirtualProcessors()); + #else - numProcs = 1; -#endif - } -#ifdef _OPENMP - if( threads <= 0 ) - threads = numProcs; - else - threads = MIN( threads, numProcs ); + return 1; +#endif +} + +void cv::setNumThreads( int threads ) +{ +#ifdef HAVE_PARALLEL_FRAMEWORK numThreads = threads; -#else - numThreads = 1; +#endif + +#ifdef HAVE_TBB + + if(tbbScheduler.is_active()) tbbScheduler.terminate(); + if(threads > 0) tbbScheduler.initialize(threads); + +#elif defined HAVE_CSTRIPES + + return; // nothing needed + +#elif defined HAVE_OPENMP + + if(omp_in_parallel()) + return; // can't change number of openmp threads inside a parallel region + + omp_set_num_threads(threads > 0 ? threads : numThreadsMax); + +#elif defined HAVE_GCD + + // unsupported + // there is only private dispatch_queue_set_width() and only for desktop + +#elif defined HAVE_CONCURRENCY + + if (threads <= 0) + { + pplScheduler = 0; + } + else if (threads == 1) + { + // Concurrency always uses >=2 threads, so we just disable it if 1 thread is requested + numThreads = 0; + } + else if (pplScheduler == 0 || 1 + pplScheduler->GetNumberOfVirtualProcessors() != (unsigned int)threads) + { + pplScheduler = Concurrency::Scheduler::Create(Concurrency::SchedulerPolicy(2, + Concurrency::PolicyElementKey::MinConcurrency, threads-1, + Concurrency::PolicyElementKey::MaxConcurrency, threads-1)); + } + #endif } int cv::getThreadNum(void) { -#ifdef _OPENMP +#if defined HAVE_TBB + #if TBB_INTERFACE_VERSION >= 6100 && defined TBB_PREVIEW_TASK_ARENA && TBB_PREVIEW_TASK_ARENA + return tbb::task_arena::current_slot(); + #else + return 0; + #endif +#elif defined HAVE_CSTRIPES + return pix(); +#elif defined HAVE_OPENMP return omp_get_thread_num(); +#elif defined HAVE_GCD + return statc_cast<int>(pthread_self()); // no zero-based indexing +#elif defined HAVE_CONCURRENCY + return std::max(0, (int)Concurrency::Context::VirtualProcessorId()); // zero for master thread, unique number for others but not necessary 1,2,3,... #else return 0; #endif