Reimplement thread management functions:

* Refactor auto-detection of parallel frameworks
* Implement cv:getNumThreads, cv::setNumThreads and cv::getThreadNum for all supported frameworks
* From now cv::setNumThreads(0) can be used to turn off parallelisation
pull/42/head
Andrey Kamaev 13 years ago
parent b54f59de90
commit 460644b8a4
  1. 301
      modules/core/src/parallel.cpp

@ -42,128 +42,196 @@
#include "precomp.hpp"
#if !defined HAVE_TBB && !defined HAVE_OPENMP && !defined HAVE_GCD && !defined HAVE_CONCURRENCY && !defined HAVE_CSTRIPES
#ifdef __APPLE__
#define HAVE_GCD
#elif defined _MSC_VER && _MSC_VER >= 1600
#define HAVE_CONCURRENCY
#endif
#ifdef _OPENMP
#define HAVE_OPENMP
#endif
#ifdef HAVE_CONCURRENCY
#include <ppl.h>
#elif defined HAVE_OPENMP
#include <omp.h>
#elif defined HAVE_GCD
#include <dispatch/dispatch.h>
#elif defined HAVE_TBB
#ifdef __APPLE__
#define HAVE_GCD
#endif
#if defined _MSC_VER && _MSC_VER >= 1600
#define HAVE_CONCURRENCY
#endif
/* IMPORTANT: always use the same order of defines
1. HAVE_TBB - 3rdparty library, should be explicitly enabled
2. HAVE_CSTRIPES - 3rdparty library, should be explicitly enabled
3. HAVE_OPENMP - integrated to compiler, should be explicitly enabled
4. HAVE_GCD - system wide, used automatically (APPLE only)
5. HAVE_CONCURRENCY - part of runtime, used automatically (Windows only - MSVS 10, MSVS 11)
*/
#if defined HAVE_TBB
#include "tbb/tbb_stddef.h"
#if TBB_VERSION_MAJOR*100 + TBB_VERSION_MINOR >= 202
#include "tbb/tbb.h"
#include "tbb/task.h"
#if TBB_INTERFACE_VERSION >= 6100
#include "tbb/task_arena.h"
#endif
#undef min
#undef max
#else
#undef HAVE_TBB
#endif // end TBB version
#elif defined HAVE_CSTRIPES
#include "C=.h"
#undef shared
#endif
/*
HAVE_TBB - using TBB
HAVE_GCD - using GCD
HAVE_OPENMP - using OpenMP
HAVE_CONCURRENCY - using visual studio 2010 concurrency
*/
#ifndef HAVE_TBB
#if defined HAVE_CSTRIPES
#include "C=.h"
#undef shared
#elif defined HAVE_OPENMP
#include <omp.h>
#elif defined HAVE_GCD
#include <dispatch/dispatch.h>
#include <pthread.h>
#elif defined HAVE_CONCURRENCY
#include <ppl.h>
#endif
#endif
#if defined HAVE_TBB || defined HAVE_CSTRIPES || defined HAVE_OPENMP || defined HAVE_GCD || defined HAVE_CONCURRENCY
#define HAVE_PARALLEL_FRAMEWORK
#endif
namespace cv
{
ParallelLoopBody::~ParallelLoopBody() {}
}
namespace
{
#ifdef HAVE_PARALLEL_FRAMEWORK
class ParallelLoopBodyWrapper
{
public:
ParallelLoopBodyWrapper(const ParallelLoopBody& _body, const Range& _r, double _nstripes)
ParallelLoopBodyWrapper(const cv::ParallelLoopBody& _body, const cv::Range& _r, double _nstripes)
{
body = &_body;
wholeRange = _r;
double len = wholeRange.end - wholeRange.start;
nstripes = cvRound(_nstripes < 0 ? len : MIN(MAX(_nstripes, 1.), len));
nstripes = cvRound(_nstripes <= 0 ? len : MIN(MAX(_nstripes, 1.), len));
}
void operator()(const Range& sr) const
void operator()(const cv::Range& sr) const
{
Range r;
cv::Range r;
r.start = (int)(wholeRange.start +
((size_t)sr.start*(wholeRange.end - wholeRange.start) + nstripes/2)/nstripes);
r.end = sr.end >= nstripes ? wholeRange.end : (int)(wholeRange.start +
((size_t)sr.end*(wholeRange.end - wholeRange.start) + nstripes/2)/nstripes);
(*body)(r);
}
Range stripeRange() const { return Range(0, nstripes); }
cv::Range stripeRange() const { return cv::Range(0, nstripes); }
protected:
const ParallelLoopBody* body;
Range wholeRange;
const cv::ParallelLoopBody* body;
cv::Range wholeRange;
int nstripes;
};
ParallelLoopBody::~ParallelLoopBody() {}
#if defined HAVE_TBB
class ProxyLoopBody : public ParallelLoopBodyWrapper
{
public:
ProxyLoopBody(const ParallelLoopBody& _body, const Range& _r, double _nstripes)
ProxyLoopBody(const cv::ParallelLoopBody& _body, const cv::Range& _r, double _nstripes)
: ParallelLoopBodyWrapper(_body, _r, _nstripes)
{}
void operator ()(const tbb::blocked_range<int>& range) const
{
this->ParallelLoopBodyWrapper::operator()(Range(range.begin(), range.end()));
this->ParallelLoopBodyWrapper::operator()(cv::Range(range.begin(), range.end()));
}
};
#elif defined HAVE_CSTRIPES || defined HAVE_OPENMP
typedef ParallelLoopBodyWrapper ProxyLoopBody;
#elif defined HAVE_GCD
typedef ParallelLoopBodyWrapper ProxyLoopBody;
static
void block_function(void* context, size_t index)
static void block_function(void* context, size_t index)
{
ProxyLoopBody* ptr_body = static_cast<ProxyLoopBody*>(context);
(*ptr_body)(Range(index, index + 1));
(*ptr_body)(cv::Range(index, index + 1));
}
#elif defined HAVE_CONCURRENCY
class ProxyLoopBody : public ParallelLoopBodyWrapper
{
public:
ProxyLoopBody(const ParallelLoopBody& _body, const Range& _r, double _nstripes)
ProxyLoopBody(const cv::ParallelLoopBody& _body, const cv::Range& _r, double _nstripes)
: ParallelLoopBodyWrapper(_body, _r, _nstripes)
{}
void operator ()(int i) const
{
this->ParallelLoopBodyWrapper::operator()(Range(i, i + 1));
this->ParallelLoopBodyWrapper::operator()(cv::Range(i, i + 1));
}
};
#else
typedef ParallelLoopBodyWrapper ProxyLoopBody;
#endif
void parallel_for_(const Range& range, const ParallelLoopBody& body, double nstripes)
static int numThreads = -1;
#if defined HAVE_TBB
static tbb::task_scheduler_init tbbScheduler(tbb::task_scheduler_init::deferred);
#elif defined HAVE_CSTRIPES
// nothing for C=
#elif defined HAVE_OPENMP
static int numThreadsMax = omp_get_max_threads();
#elif defined HAVE_GCD
// nothing for GCD
#elif defined HAVE_CONCURRENCY
class SchedPtr
{
Concurrency::Scheduler* sched_;
public:
Concurrency::Scheduler* operator->() { return sched_; }
operator Concurrency::Scheduler*() { return sched_; }
void operator=(Concurrency::Scheduler* sched)
{
if (sched_) sched_->Release();
sched_ = sched;
}
SchedPtr() : sched_(0) {}
~SchedPtr() { *this = 0; }
};
static SchedPtr pplScheduler;
#endif
#endif // HAVE_PARALLEL_FRAMEWORK
} //namespace
/* ================================ parallel_for_ ================================ */
void cv::parallel_for_(const cv::Range& range, const cv::ParallelLoopBody& body, double nstripes)
{
#ifdef HAVE_PARALLEL_FRAMEWORK
if(numThreads != 0)
{
ProxyLoopBody pbody(body, range, nstripes);
Range stripeRange = pbody.stripeRange();
cv::Range stripeRange = pbody.stripeRange();
#if defined HAVE_TBB
tbb::parallel_for(tbb::blocked_range<int>(stripeRange.start, stripeRange.end), pbody);
#elif defined HAVE_CONCURRENCY
#elif defined HAVE_CSTRIPES
Concurrency::parallel_for(stripeRange.start, stripeRange.end, pbody);
parallel(MAX(0, numThreads))
{
int offset = stripeRange.start;
int len = stripeRange.end - offset;
Range r(offset + CPX_RANGE_START(len), offset + CPX_RANGE_END(len));
pbody(r);
barrier();
}
#elif defined HAVE_OPENMP
#pragma omp parallel for schedule(dynamic)
#pragma omp parallel for schedule(dynamic)
for (int i = stripeRange.start; i < stripeRange.end; ++i)
pbody(Range(i, i + 1));
@ -172,69 +240,140 @@ namespace cv
dispatch_queue_t concurrent_queue = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0);
dispatch_apply_f(stripeRange.end - stripeRange.start, concurrent_queue, &pbody, block_function);
#elif defined HAVE_CSTRIPES
#elif defined HAVE_CONCURRENCY
parallel()
if(!pplScheduler || pplScheduler->Id() == Concurrency::CurrentScheduler::Id())
{
int offset = stripeRange.start;
int len = stripeRange.end - offset;
Range r(offset + CPX_RANGE_START(len), offset + CPX_RANGE_END(len));
pbody(r);
barrier();
Concurrency::parallel_for(stripeRange.start, stripeRange.end, pbody);
}
else
{
pplScheduler->Attach();
Concurrency::parallel_for(stripeRange.start, stripeRange.end, pbody);
Concurrency::CurrentScheduler::Detach();
}
#else
pbody(stripeRange);
#error You have hacked and compiling with unsupported parallel framework
#endif
}
} // namespace cv
}
else
static int numThreads = 0;
static int numProcs = 0;
#endif // HAVE_PARALLEL_FRAMEWORK
{
(void)nstripes;
body(range);
}
}
int cv::getNumThreads(void)
{
if( !numProcs )
setNumThreads(0);
return numThreads;
}
#ifdef HAVE_PARALLEL_FRAMEWORK
if(numThreads == 0)
return 1;
void cv::setNumThreads( int
#ifdef _OPENMP
threads
#endif
)
{
if( !numProcs )
{
#ifdef _OPENMP
numProcs = omp_get_num_procs();
#if defined HAVE_TBB
return tbbScheduler.is_active()
? numThreads
: tbb::task_scheduler_init::default_num_threads();
#elif defined HAVE_CSTRIPES
return cv::getNumberOfCPUs();
#elif defined HAVE_OPENMP
return omp_get_max_threads();
#elif defined HAVE_GCD
return 512; // the GCD thread pool limit
#elif defined HAVE_CONCURRENCY
return 1 + (pplScheduler == 0
? Concurrency::CurrentScheduler::Get()->GetNumberOfVirtualProcessors()
: pplScheduler->GetNumberOfVirtualProcessors());
#else
numProcs = 1;
#endif
}
#ifdef _OPENMP
if( threads <= 0 )
threads = numProcs;
else
threads = MIN( threads, numProcs );
return 1;
#endif
}
void cv::setNumThreads( int threads )
{
#ifdef HAVE_PARALLEL_FRAMEWORK
numThreads = threads;
#else
numThreads = 1;
#endif
#ifdef HAVE_TBB
if(tbbScheduler.is_active()) tbbScheduler.terminate();
if(threads > 0) tbbScheduler.initialize(threads);
#elif defined HAVE_CSTRIPES
return; // nothing needed
#elif defined HAVE_OPENMP
if(omp_in_parallel())
return; // can't change number of openmp threads inside a parallel region
omp_set_num_threads(threads > 0 ? threads : numThreadsMax);
#elif defined HAVE_GCD
// unsupported
// there is only private dispatch_queue_set_width() and only for desktop
#elif defined HAVE_CONCURRENCY
if (threads <= 0)
{
pplScheduler = 0;
}
else if (threads == 1)
{
// Concurrency always uses >=2 threads, so we just disable it if 1 thread is requested
numThreads = 0;
}
else if (pplScheduler == 0 || 1 + pplScheduler->GetNumberOfVirtualProcessors() != (unsigned int)threads)
{
pplScheduler = Concurrency::Scheduler::Create(Concurrency::SchedulerPolicy(2,
Concurrency::PolicyElementKey::MinConcurrency, threads-1,
Concurrency::PolicyElementKey::MaxConcurrency, threads-1));
}
#endif
}
int cv::getThreadNum(void)
{
#ifdef _OPENMP
#if defined HAVE_TBB
#if TBB_INTERFACE_VERSION >= 6100 && defined TBB_PREVIEW_TASK_ARENA && TBB_PREVIEW_TASK_ARENA
return tbb::task_arena::current_slot();
#else
return 0;
#endif
#elif defined HAVE_CSTRIPES
return pix();
#elif defined HAVE_OPENMP
return omp_get_thread_num();
#elif defined HAVE_GCD
return statc_cast<int>(pthread_self()); // no zero-based indexing
#elif defined HAVE_CONCURRENCY
return std::max(0, (int)Concurrency::Context::VirtualProcessorId()); // zero for master thread, unique number for others but not necessary 1,2,3,...
#else
return 0;
#endif

Loading…
Cancel
Save