Merge pull request #10691 from alalek:parallel_for_2018

pull/10735/head
Alexander Alekhin 7 years ago
commit f57630d92b
  1. 2
      modules/core/include/opencv2/core/utility.hpp
  2. 38
      modules/core/src/parallel.cpp
  3. 765
      modules/core/src/parallel_impl.cpp
  4. 17
      modules/core/src/parallel_impl.hpp
  5. 581
      modules/core/src/parallel_pthreads.cpp
  6. 3
      modules/ts/src/ts_perf.cpp

@ -233,6 +233,8 @@ CV_EXPORTS_W int getNumThreads();
/** @brief Returns the index of the currently executed thread within the current parallel region. Always /** @brief Returns the index of the currently executed thread within the current parallel region. Always
returns 0 if called outside of parallel region. returns 0 if called outside of parallel region.
@deprecated Current implementation doesn't corresponding to this documentation.
The exact meaning of the return value depends on the threading framework used by OpenCV library: The exact meaning of the return value depends on the threading framework used by OpenCV library:
- `TBB` - Unsupported with current 4.1 TBB release. Maybe will be supported in future. - `TBB` - Unsupported with current 4.1 TBB release. Maybe will be supported in future.
- `OpenMP` - The thread number, within the current team, of the calling thread. - `OpenMP` - The thread number, within the current team, of the calling thread.

@ -42,6 +42,7 @@
#include "precomp.hpp" #include "precomp.hpp"
#include <opencv2/core/utils/configuration.private.hpp>
#include <opencv2/core/utils/trace.private.hpp> #include <opencv2/core/utils/trace.private.hpp>
#if defined _WIN32 || defined WINCE #if defined _WIN32 || defined WINCE
@ -125,19 +126,15 @@
# define CV_PARALLEL_FRAMEWORK "pthreads" # define CV_PARALLEL_FRAMEWORK "pthreads"
#endif #endif
#include "parallel_impl.hpp"
using namespace cv; using namespace cv;
namespace cv namespace cv
{ {
ParallelLoopBody::~ParallelLoopBody() {} ParallelLoopBody::~ParallelLoopBody() {}
#ifdef HAVE_PTHREADS_PF
void parallel_for_pthreads(const cv::Range& range, const cv::ParallelLoopBody& body, double nstripes);
size_t parallel_pthreads_get_threads_num();
void parallel_pthreads_set_threads_num(int num);
#endif
} }
namespace namespace
{ {
#ifdef CV_PARALLEL_FRAMEWORK #ifdef CV_PARALLEL_FRAMEWORK
@ -558,10 +555,35 @@ int cv::getNumThreads(void)
#endif #endif
} }
void cv::setNumThreads( int threads ) namespace cv {
unsigned defaultNumberOfThreads()
{
#ifdef __ANDROID__
// many modern phones/tables have 4-core CPUs. Let's use no more
// than 2 threads by default not to overheat the devices
const unsigned int default_number_of_threads = 2;
#else
const unsigned int default_number_of_threads = (unsigned int)std::max(1, cv::getNumberOfCPUs());
#endif
unsigned result = default_number_of_threads;
static int config_num_threads = (int)utils::getConfigurationParameterSizeT("OPENCV_FOR_THREADS_NUM", 0);
if (config_num_threads)
{
result = (unsigned)std::max(1, config_num_threads);
//do we need upper limit of threads number?
}
return result;
}
}
void cv::setNumThreads( int threads_ )
{ {
(void)threads; (void)threads_;
#ifdef CV_PARALLEL_FRAMEWORK #ifdef CV_PARALLEL_FRAMEWORK
int threads = (threads_ < 0) ? defaultNumberOfThreads() : (unsigned)threads_;
numThreads = threads; numThreads = threads;
#endif #endif

@ -0,0 +1,765 @@
// This file is part of OpenCV project.
// It is subject to the license terms in the LICENSE file found in the top-level directory
// of this distribution and at http://opencv.org/license.html.
#include "precomp.hpp"
#include "parallel_impl.hpp"
#ifdef HAVE_PTHREADS_PF
#include <pthread.h>
#include <opencv2/core/utils/configuration.private.hpp>
#include <opencv2/core/utils/logger.defines.hpp>
//#undef CV_LOG_STRIP_LEVEL
//#define CV_LOG_STRIP_LEVEL CV_LOG_LEVEL_VERBOSE + 1
#include <opencv2/core/utils/logger.hpp>
//#define CV_PROFILE_THREADS 64
//#define getTickCount getCPUTickCount // use this if getTickCount() calls are expensive (and getCPUTickCount() is accurate)
//#define CV_USE_GLOBAL_WORKERS_COND_VAR // not effective on many-core systems (10+)
#ifdef CV_CXX11
#include <atomic>
#else
#include <unistd.h> // _POSIX_PRIORITY_SCHEDULING
#endif
// Spin lock's OS-level yield
#ifdef DECLARE_CV_YIELD
DECLARE_CV_YIELD
#endif
#ifndef CV_YIELD
# ifdef CV_CXX11
# include <thread>
# define CV_YIELD() std::this_thread::yield()
# elif defined(_POSIX_PRIORITY_SCHEDULING)
# include <sched.h>
# define CV_YIELD() sched_yield()
# else
# warning "Can't detect sched_yield() on the target platform. Specify CV_YIELD() definition via compiler flags."
# define CV_YIELD() /* no-op: works, but not effective */
# endif
#endif // CV_YIELD
// Spin lock's CPU-level yield (required for Hyper-Threading)
#ifdef DECLARE_CV_PAUSE
DECLARE_CV_PAUSE
#endif
#ifndef CV_PAUSE
#if defined __GNUC__ && (defined __i386__ || defined __x86_64__)
# define CV_PAUSE(v) do { for (int __delay = (v); __delay > 0; --__delay) { _mm_pause(); } } while (0)
# elif defined __GNUC__ && defined __aarch64__
# define CV_PAUSE(v) do { for (int __delay = (v); __delay > 0; --__delay) { asm volatile("yield" ::: "memory"); } } while (0)
# elif defined __GNUC__ && defined __arm__
# define CV_PAUSE(v) do { for (int __delay = (v); __delay > 0; --__delay) { asm volatile("" ::: "memory"); } } while (0)
# else
# warning "Can't detect 'pause' (CPU-yield) instruction on the target platform. Specify CV_PAUSE() definition via compiler flags."
# define CV_PAUSE(...) do { /* no-op: works, but not effective */ } while (0)
# endif
#endif // CV_PAUSE
namespace cv
{
static int CV_ACTIVE_WAIT_PAUSE_LIMIT = (int)utils::getConfigurationParameterSizeT("OPENCV_THREAD_POOL_ACTIVE_WAIT_PAUSE_LIMIT", 16); // iterations
static int CV_WORKER_ACTIVE_WAIT = (int)utils::getConfigurationParameterSizeT("OPENCV_THREAD_POOL_ACTIVE_WAIT_WORKER", 2000); // iterations
static int CV_MAIN_THREAD_ACTIVE_WAIT = (int)utils::getConfigurationParameterSizeT("OPENCV_THREAD_POOL_ACTIVE_WAIT_MAIN", 10000); // iterations
static int CV_WORKER_ACTIVE_WAIT_THREADS_LIMIT = (int)utils::getConfigurationParameterSizeT("OPENCV_THREAD_POOL_ACTIVE_WAIT_THREADS_LIMIT", 0); // number of real cores
class WorkerThread;
class ParallelJob;
class ThreadPool
{
public:
static ThreadPool& instance()
{
CV_SINGLETON_LAZY_INIT_REF(ThreadPool, new ThreadPool())
}
static void stop()
{
ThreadPool& manager = instance();
manager.reconfigure(0);
}
void reconfigure(unsigned new_threads_count)
{
if (new_threads_count == threads.size())
return;
pthread_mutex_lock(&mutex);
reconfigure_(new_threads_count);
pthread_mutex_unlock(&mutex);
}
bool reconfigure_(unsigned new_threads_count); // internal implementation
void run(const Range& range, const ParallelLoopBody& body, double nstripes);
size_t getNumOfThreads();
void setNumOfThreads(unsigned n);
ThreadPool();
~ThreadPool();
unsigned num_threads;
pthread_mutex_t mutex; // guards fields (job/threads) from non-worker threads (concurrent parallel_for calls)
#if defined(CV_USE_GLOBAL_WORKERS_COND_VAR)
pthread_cond_t cond_thread_wake;
#endif
pthread_mutex_t mutex_notify;
pthread_cond_t cond_thread_task_complete;
std::vector< Ptr<WorkerThread> > threads;
Ptr<ParallelJob> job;
#ifdef CV_PROFILE_THREADS
double tickFreq;
int64 jobSubmitTime;
struct ThreadStatistics
{
ThreadStatistics() : threadWait(0)
{
reset();
}
void reset()
{
threadWake = 0;
threadExecuteStart = 0;
threadExecuteStop = 0;
executedTasks = 0;
keepActive = false;
threadPing = getTickCount();
}
int64 threadWait; // don't reset by default
int64 threadPing; // don't reset by default
int64 threadWake;
int64 threadExecuteStart;
int64 threadExecuteStop;
int64 threadFree;
unsigned executedTasks;
bool keepActive;
int64 dummy_[8]; // separate cache lines
void dump(int id, int64 baseTime, double tickFreq)
{
if (id < 0)
std::cout << "Main: ";
else
printf("T%03d: ", id + 2);
printf("wait=% 10.1f ping=% 6.1f",
threadWait > 0 ? (threadWait - baseTime) / tickFreq * 1e6 : -0.0,
threadPing > 0 ? (threadPing - baseTime) / tickFreq * 1e6 : -0.0);
if (threadWake > 0)
printf(" wake=% 6.1f",
(threadWake > 0 ? (threadWake - baseTime) / tickFreq * 1e6 : -0.0));
if (threadExecuteStart > 0)
{
printf(" exec=% 6.1f - % 6.1f tasksDone=%5u free=% 6.1f",
(threadExecuteStart > 0 ? (threadExecuteStart - baseTime) / tickFreq * 1e6 : -0.0),
(threadExecuteStop > 0 ? (threadExecuteStop - baseTime) / tickFreq * 1e6 : -0.0),
executedTasks,
(threadFree > 0 ? (threadFree - baseTime) / tickFreq * 1e6 : -0.0));
if (id >= 0)
printf(" active=%s\n", keepActive ? "true" : "false");
else
printf("\n");
}
else
printf(" ------------------------------------------------------------------------------\n");
}
};
ThreadStatistics threads_stat[CV_PROFILE_THREADS]; // 0 - main thread, 1..N - worker threads
#endif
};
class WorkerThread
{
public:
ThreadPool& thread_pool;
unsigned id;
pthread_t posix_thread;
bool is_created;
volatile bool stop_thread;
volatile bool has_wake_signal;
volatile bool dont_wait;
Ptr<ParallelJob> job;
pthread_mutex_t mutex;
#if !defined(CV_USE_GLOBAL_WORKERS_COND_VAR)
volatile bool isActive;
pthread_cond_t cond_thread_wake;
#endif
WorkerThread(ThreadPool& thread_pool_, unsigned id_) :
thread_pool(thread_pool_),
id(id_),
posix_thread(0),
is_created(false),
stop_thread(false),
has_wake_signal(false),
dont_wait(false)
#if !defined(CV_USE_GLOBAL_WORKERS_COND_VAR)
, isActive(true)
#endif
{
CV_LOG_VERBOSE(NULL, 1, "MainThread: initializing new worker: " << id);
int res = pthread_mutex_init(&mutex, NULL);
if (res != 0)
{
CV_LOG_ERROR(NULL, id << ": Can't create thread mutex: res = " << res);
return;
}
#if !defined(CV_USE_GLOBAL_WORKERS_COND_VAR)
res = pthread_cond_init(&cond_thread_wake, NULL);
if (res != 0)
{
CV_LOG_ERROR(NULL, id << ": Can't create thread condition variable: res = " << res);
return;
}
#endif
res = pthread_create(&posix_thread, NULL, thread_loop_wrapper, (void*)this);
if (res != 0)
{
CV_LOG_ERROR(NULL, id << ": Can't spawn new thread: res = " << res);
}
else
{
is_created = true;
}
}
~WorkerThread()
{
CV_LOG_VERBOSE(NULL, 1, "MainThread: destroy worker thread: " << id);
if (is_created)
{
if (!stop_thread)
{
pthread_mutex_lock(&mutex); // to avoid signal miss due pre-check
stop_thread = true;
pthread_mutex_unlock(&mutex);
#if defined(CV_USE_GLOBAL_WORKERS_COND_VAR)
pthread_cond_broadcast(&thread_pool.cond_thread_wake);
#else
pthread_cond_signal(&cond_thread_wake);
#endif
}
pthread_join(posix_thread, NULL);
}
#if !defined(CV_USE_GLOBAL_WORKERS_COND_VAR)
pthread_cond_destroy(&cond_thread_wake);
#endif
pthread_mutex_destroy(&mutex);
}
void thread_body();
static void* thread_loop_wrapper(void* thread_object)
{
((WorkerThread*)thread_object)->thread_body();
return 0;
}
};
class ParallelJob
{
public:
ParallelJob(const ThreadPool& thread_pool_, const Range& range_, const ParallelLoopBody& body_, int nstripes_) :
thread_pool(thread_pool_),
body(body_),
range(range_),
nstripes((unsigned)nstripes_),
is_completed(false)
{
CV_LOG_VERBOSE(NULL, 5, "ParallelJob::ParallelJob(" << (void*)this << ")");
#ifdef CV_CXX11
current_task.store(0, std::memory_order_relaxed);
active_thread_count.store(0, std::memory_order_relaxed);
completed_thread_count.store(0, std::memory_order_relaxed);
#else
current_task = 0;
active_thread_count = 0;
completed_thread_count = 0;
#endif
dummy0_[0] = 0, dummy1_[0] = 0, dummy2_[0] = 0; // compiler warning
}
~ParallelJob()
{
CV_LOG_VERBOSE(NULL, 5, "ParallelJob::~ParallelJob(" << (void*)this << ")");
}
unsigned execute(bool is_worker_thread)
{
unsigned executed_tasks = 0;
const int task_count = range.size();
const int remaining_multiplier = std::min(nstripes,
std::max(
std::min(100u, thread_pool.num_threads * 4),
thread_pool.num_threads * 2
)); // experimental value
for (;;)
{
int chunk_size = std::max(1, (task_count - current_task) / remaining_multiplier);
#ifdef CV_CXX11
int id = current_task.fetch_add(chunk_size, std::memory_order_seq_cst);
#else
int id = (int)CV_XADD(&current_task, chunk_size);
#endif
if (id >= task_count)
break; // no more free tasks
executed_tasks += chunk_size;
int start_id = id;
int end_id = std::min(task_count, id + chunk_size);
CV_LOG_VERBOSE(NULL, 9, "Thread: job " << start_id << "-" << end_id);
//TODO: if (not pending exception)
{
body.operator()(Range(range.start + start_id, range.start + end_id));
}
if (is_worker_thread && is_completed)
{
CV_LOG_ERROR(NULL, "\t\t\t\tBUG! Job: " << (void*)this << " " << id << " " << active_thread_count << " " << completed_thread_count);
CV_Assert(!is_completed); // TODO Dbg this
}
}
return executed_tasks;
}
const ThreadPool& thread_pool;
const ParallelLoopBody& body;
const Range range;
const unsigned nstripes;
#ifdef CV_CXX11
std::atomic<int> current_task; // next free part of job
int64 dummy0_[8]; // avoid cache-line reusing for the same atomics
std::atomic<int> active_thread_count; // number of threads worked on this job
int64 dummy1_[8]; // avoid cache-line reusing for the same atomics
std::atomic<int> completed_thread_count; // number of threads completed any activities on this job
int64 dummy2_[8]; // avoid cache-line reusing for the same atomics
#else
/*CV_DECL_ALIGNED(64)*/ volatile int current_task; // next free part of job
int64 dummy0_[8]; // avoid cache-line reusing for the same atomics
/*CV_DECL_ALIGNED(64)*/ volatile int active_thread_count; // number of threads worked on this job
int64 dummy1_[8]; // avoid cache-line reusing for the same atomics
/*CV_DECL_ALIGNED(64)*/ volatile int completed_thread_count; // number of threads completed any activities on this job
int64 dummy2_[8]; // avoid cache-line reusing for the same atomics
#endif
volatile bool is_completed; // std::atomic_flag ?
// TODO exception handling
};
void WorkerThread::thread_body()
{
(void)cv::utils::getThreadID(); // notify OpenCV about new thread
CV_LOG_VERBOSE(NULL, 5, "Thread: new thread: " << id);
bool allow_active_wait = true;
#ifdef CV_PROFILE_THREADS
ThreadPool::ThreadStatistics& stat = thread_pool.threads_stat[id + 1];
#endif
while (!stop_thread)
{
CV_LOG_VERBOSE(NULL, 5, "Thread: ... loop iteration: allow_active_wait=" << allow_active_wait << " has_wake_signal=" << has_wake_signal << " dont_wait=" << dont_wait );
if (allow_active_wait && CV_WORKER_ACTIVE_WAIT > 0)
{
allow_active_wait = false;
for (int i = 0; i < CV_WORKER_ACTIVE_WAIT; i++)
{
if (has_wake_signal)
break;
if (CV_ACTIVE_WAIT_PAUSE_LIMIT > 0 && (i < CV_ACTIVE_WAIT_PAUSE_LIMIT || (i & 1)))
CV_PAUSE(16);
else
CV_YIELD();
}
}
pthread_mutex_lock(&mutex);
#ifdef CV_PROFILE_THREADS
stat.threadWait = getTickCount();
#endif
while (!has_wake_signal && !dont_wait) // to handle spurious wakeups
{
//CV_LOG_VERBOSE(NULL, 5, "Thread: wait (sleep) ...");
#if defined(CV_USE_GLOBAL_WORKERS_COND_VAR)
pthread_cond_wait(&thread_pool.cond_thread_wake, &mutex);
#else
isActive = false;
pthread_cond_wait(&cond_thread_wake, &mutex);
isActive = true;
#endif
CV_LOG_VERBOSE(NULL, 5, "Thread: wake ... (has_wake_signal=" << has_wake_signal << " stop_thread=" << stop_thread << ")")
}
dont_wait = false;
#ifdef CV_PROFILE_THREADS
stat.threadWake = getTickCount();
#endif
if (!stop_thread)
{
CV_LOG_VERBOSE(NULL, 5, "Thread: checking for new job");
if (CV_WORKER_ACTIVE_WAIT_THREADS_LIMIT == 0)
allow_active_wait = true;
Ptr<ParallelJob> j_ptr; swap(j_ptr, job);
has_wake_signal = false;
pthread_mutex_unlock(&mutex);
ParallelJob* j = j_ptr;
if (j)
{
CV_LOG_VERBOSE(NULL, 5, "Thread: job size=" << j->range.size() << " done=" << j->current_task);
if (j->current_task < j->range.size())
{
#ifdef CV_CXX11
int other = j->active_thread_count.fetch_add(1, std::memory_order_seq_cst);
#else
int other = CV_XADD(&j->active_thread_count, 1);
#endif
CV_LOG_VERBOSE(NULL, 5, "Thread: processing new job (with " << other << " other threads)"); CV_UNUSED(other);
#ifdef CV_PROFILE_THREADS
stat.threadExecuteStart = getTickCount();
stat.executedTasks = j->execute(true);
stat.threadExecuteStop = getTickCount();
#else
j->execute(true);
#endif
#ifdef CV_CXX11
int completed = j->completed_thread_count.fetch_add(1, std::memory_order_seq_cst) + 1;
int active = j->active_thread_count.load(std::memory_order_acquire);
#else
int completed = (int)CV_XADD(&j->completed_thread_count, 1) + 1;
int active = j->active_thread_count;
#endif
if (CV_WORKER_ACTIVE_WAIT_THREADS_LIMIT > 0)
{
allow_active_wait = true;
if (active >= CV_WORKER_ACTIVE_WAIT_THREADS_LIMIT && (id & 1) == 0) // turn off a half of threads
allow_active_wait = false;
}
CV_LOG_VERBOSE(NULL, 5, "Thread: completed job processing: " << active << " " << completed);
if (active == completed)
{
bool need_signal = !j->is_completed;
j->is_completed = true;
j = NULL; j_ptr.release();
if (need_signal)
{
CV_LOG_VERBOSE(NULL, 5, "Thread: job finished => notifying the main thread");
pthread_mutex_lock(&thread_pool.mutex_notify); // to avoid signal miss due pre-check condition
// empty
pthread_mutex_unlock(&thread_pool.mutex_notify);
pthread_cond_broadcast/*pthread_cond_signal*/(&thread_pool.cond_thread_task_complete);
}
}
}
else
{
has_wake_signal = false;
CV_LOG_VERBOSE(NULL, 5, "Thread: no free job tasks");
}
}
}
else
{
pthread_mutex_unlock(&mutex);
}
#ifdef CV_PROFILE_THREADS
stat.threadFree = getTickCount();
stat.keepActive = allow_active_wait;
#endif
}
}
ThreadPool::ThreadPool()
{
#ifdef CV_PROFILE_THREADS
tickFreq = getTickFrequency();
#endif
int res = 0;
res |= pthread_mutex_init(&mutex, NULL);
res |= pthread_mutex_init(&mutex_notify, NULL);
#if defined(CV_USE_GLOBAL_WORKERS_COND_VAR)
res |= pthread_cond_init(&cond_thread_wake, NULL);
#endif
res |= pthread_cond_init(&cond_thread_task_complete, NULL);
if (0 != res)
{
CV_LOG_FATAL(NULL, "Failed to initialize ThreadPool (pthreads)");
}
num_threads = defaultNumberOfThreads();
}
bool ThreadPool::reconfigure_(unsigned new_threads_count)
{
if (new_threads_count == threads.size())
return false;
if (new_threads_count < threads.size())
{
CV_LOG_VERBOSE(NULL, 1, "MainThread: reduce worker pool: " << threads.size() << " => " << new_threads_count);
std::vector< Ptr<WorkerThread> > release_threads(threads.size() - new_threads_count);
for (size_t i = new_threads_count; i < threads.size(); ++i)
{
pthread_mutex_lock(&threads[i]->mutex); // to avoid signal miss due pre-check
threads[i]->stop_thread = true;
threads[i]->has_wake_signal = true;
#if !defined(CV_USE_GLOBAL_WORKERS_COND_VAR)
pthread_mutex_unlock(&threads[i]->mutex);
pthread_cond_broadcast/*pthread_cond_signal*/(&threads[i]->cond_thread_wake); // wake thread
#else
pthread_mutex_unlock(&threads[i]->mutex);
#endif
std::swap(threads[i], release_threads[i - new_threads_count]);
}
#if defined(CV_USE_GLOBAL_WORKERS_COND_VAR)
CV_LOG_VERBOSE(NULL, 1, "MainThread: notify worker threads about termination...");
pthread_cond_broadcast(&cond_thread_wake); // wake all threads
#endif
threads.resize(new_threads_count);
release_threads.clear(); // calls thread_join which want to lock mutex
return false;
}
else
{
CV_LOG_VERBOSE(NULL, 1, "MainThread: upgrade worker pool: " << threads.size() << " => " << new_threads_count);
for (size_t i = threads.size(); i < new_threads_count; ++i)
{
threads.push_back(Ptr<WorkerThread>(new WorkerThread(*this, (unsigned)i))); // spawn more threads
}
}
return false;
}
ThreadPool::~ThreadPool()
{
reconfigure(0);
pthread_cond_destroy(&cond_thread_task_complete);
#if defined(CV_USE_GLOBAL_WORKERS_COND_VAR)
pthread_cond_destroy(&cond_thread_wake);
#endif
pthread_mutex_destroy(&mutex);
pthread_mutex_destroy(&mutex_notify);
}
void ThreadPool::run(const Range& range, const ParallelLoopBody& body, double nstripes)
{
CV_LOG_VERBOSE(NULL, 1, "MainThread: new parallel job: num_threads=" << num_threads << " range=" << range.size() << " nstripes=" << nstripes << " job=" << (void*)job);
#ifdef CV_PROFILE_THREADS
jobSubmitTime = getTickCount();
threads_stat[0].reset();
threads_stat[0].threadWait = jobSubmitTime;
threads_stat[0].threadWake = jobSubmitTime;
#endif
if (getNumOfThreads() > 1 &&
job == NULL &&
(range.size() * nstripes >= 2 || (range.size() > 1 && nstripes <= 0))
)
{
pthread_mutex_lock(&mutex);
if (job != NULL)
{
pthread_mutex_unlock(&mutex);
body(range);
return;
}
reconfigure_(num_threads - 1);
{
CV_LOG_VERBOSE(NULL, 1, "MainThread: initialize parallel job: " << range.size());
job = Ptr<ParallelJob>(new ParallelJob(*this, range, body, nstripes));
pthread_mutex_unlock(&mutex);
CV_LOG_VERBOSE(NULL, 5, "MainThread: wake worker threads...");
for (size_t i = 0; i < threads.size(); ++i)
{
#if !defined(CV_USE_GLOBAL_WORKERS_COND_VAR)
bool isActive = threads[i]->isActive;
if (isActive || threads[i]->has_wake_signal)
#else
if (threads[i]->has_wake_signal)
#endif
{
pthread_mutex_lock(&threads[i]->mutex);
threads[i]->job = job;
#if !defined(CV_USE_GLOBAL_WORKERS_COND_VAR)
isActive = threads[i]->isActive;
#endif
threads[i]->dont_wait = true;
#ifdef CV_PROFILE_THREADS
threads_stat[i + 1].reset();
#endif
pthread_mutex_unlock(&threads[i]->mutex);
threads[i]->has_wake_signal = true;
#if !defined(CV_USE_GLOBAL_WORKERS_COND_VAR)
if (!isActive)
{
pthread_cond_broadcast/*pthread_cond_signal*/(&threads[i]->cond_thread_wake); // wake thread
}
#endif
}
else
{
CV_Assert(threads[i]->job.empty());
threads[i]->job = job;
threads[i]->dont_wait = true;
threads[i]->has_wake_signal = true;
#ifdef CV_PROFILE_THREADS
threads_stat[i + 1].reset();
#endif
#if !defined(CV_USE_GLOBAL_WORKERS_COND_VAR)
pthread_cond_broadcast/*pthread_cond_signal*/(&threads[i]->cond_thread_wake); // wake thread
#endif
}
}
#ifdef CV_PROFILE_THREADS
threads_stat[0].threadPing = getTickCount();
#endif
#if defined(CV_USE_GLOBAL_WORKERS_COND_VAR)
pthread_cond_broadcast(&cond_thread_wake); // wake all threads
#endif
#ifdef CV_PROFILE_THREADS
threads_stat[0].threadWake = getTickCount();
#endif
CV_LOG_VERBOSE(NULL, 5, "MainThread: wake worker threads... (done)");
{
ParallelJob& j = *(this->job);
#ifdef CV_PROFILE_THREADS
threads_stat[0].threadExecuteStart = getTickCount();
threads_stat[0].executedTasks = j.execute(false);
threads_stat[0].threadExecuteStop = getTickCount();
#else
j.execute(false);
#endif
CV_Assert(j.current_task >= j.range.size());
CV_LOG_VERBOSE(NULL, 5, "MainThread: complete self-tasks: " << j.active_thread_count << " " << j.completed_thread_count);
if (job->is_completed || j.active_thread_count == 0)
{
job->is_completed = true;
CV_LOG_VERBOSE(NULL, 5, "MainThread: no WIP worker threads");
}
else
{
if (CV_MAIN_THREAD_ACTIVE_WAIT > 0)
{
for (int i = 0; i < CV_MAIN_THREAD_ACTIVE_WAIT; i++) // don't spin too much in any case (inaccurate getTickCount())
{
if (job->is_completed)
{
CV_LOG_VERBOSE(NULL, 5, "MainThread: job finalize (active wait) " << j.active_thread_count << " " << j.completed_thread_count);
break;
}
if (CV_ACTIVE_WAIT_PAUSE_LIMIT > 0 && (i < CV_ACTIVE_WAIT_PAUSE_LIMIT || (i & 1)))
CV_PAUSE(16);
else
CV_YIELD();
}
}
if (!job->is_completed)
{
CV_LOG_VERBOSE(NULL, 5, "MainThread: prepare wait " << j.active_thread_count << " " << j.completed_thread_count);
pthread_mutex_lock(&mutex_notify);
for (;;)
{
if (job->is_completed)
{
CV_LOG_VERBOSE(NULL, 5, "MainThread: job finalize (wait) " << j.active_thread_count << " " << j.completed_thread_count);
break;
}
CV_LOG_VERBOSE(NULL, 5, "MainThread: wait completion (sleep) ...");
pthread_cond_wait(&cond_thread_task_complete, &mutex_notify);
CV_LOG_VERBOSE(NULL, 5, "MainThread: wake");
}
pthread_mutex_unlock(&mutex_notify);
}
}
}
#ifdef CV_PROFILE_THREADS
threads_stat[0].threadFree = getTickCount();
std::cout << "Job: sz=" << range.size() << " nstripes=" << nstripes << " Time: " << (threads_stat[0].threadFree - jobSubmitTime) / tickFreq * 1e6 << " usec" << std::endl;
for (int i = 0; i < (int)threads.size() + 1; i++)
{
threads_stat[i].dump(i - 1, jobSubmitTime, tickFreq);
}
#endif
if (job)
{
pthread_mutex_lock(&mutex);
CV_LOG_VERBOSE(NULL, 5, "MainThread: job release");
CV_Assert(job->is_completed);
job.release();
pthread_mutex_unlock(&mutex);
}
}
}
else
{
body(range);
}
}
size_t ThreadPool::getNumOfThreads()
{
return num_threads;
}
void ThreadPool::setNumOfThreads(unsigned n)
{
if (n != num_threads)
{
num_threads = n;
if (n == 1)
if (job == NULL) reconfigure(0); // stop worker threads immediatelly
}
}
size_t parallel_pthreads_get_threads_num()
{
return ThreadPool::instance().getNumOfThreads();
}
void parallel_pthreads_set_threads_num(int num)
{
if(num < 0)
{
ThreadPool::instance().setNumOfThreads(0);
}
else
{
ThreadPool::instance().setNumOfThreads(unsigned(num));
}
}
void parallel_for_pthreads(const Range& range, const ParallelLoopBody& body, double nstripes)
{
ThreadPool::instance().run(range, body, nstripes);
}
}
#endif

@ -0,0 +1,17 @@
// This file is part of OpenCV project.
// It is subject to the license terms in the LICENSE file found in the top-level directory
// of this distribution and at http://opencv.org/license.html.
#ifndef OPENCV_CORE_PARALLEL_IMPL_HPP
#define OPENCV_CORE_PARALLEL_IMPL_HPP
namespace cv {
unsigned defaultNumberOfThreads();
void parallel_for_pthreads(const Range& range, const ParallelLoopBody& body, double nstripes);
size_t parallel_pthreads_get_threads_num();
void parallel_pthreads_set_threads_num(int num);
}
#endif // OPENCV_CORE_PARALLEL_IMPL_HPP

@ -1,581 +0,0 @@
/*M///////////////////////////////////////////////////////////////////////////////////////
//
// IMPORTANT: READ BEFORE DOWNLOADING, COPYING, INSTALLING OR USING.
//
// By downloading, copying, installing or using the software you agree to this license.
// If you do not agree to this license, do not download, install,
// copy or use the software.
//
//
// License Agreement
// For Open Source Computer Vision Library
//
// Copyright (C) 2000-2008, Intel Corporation, all rights reserved.
// Copyright (C) 2009-2011, Willow Garage Inc., all rights reserved.
// Third party copyrights are property of their respective owners.
//
// Redistribution and use in source and binary forms, with or without modification,
// are permitted provided that the following conditions are met:
//
// * Redistribution's of source code must retain the above copyright notice,
// this list of conditions and the following disclaimer.
//
// * Redistribution's in binary form must reproduce the above copyright notice,
// this list of conditions and the following disclaimer in the documentation
// and/or other materials provided with the distribution.
//
// * The name of the copyright holders may not be used to endorse or promote products
// derived from this software without specific prior written permission.
//
// This software is provided by the copyright holders and contributors "as is" and
// any express or implied warranties, including, but not limited to, the implied
// warranties of merchantability and fitness for a particular purpose are disclaimed.
// In no event shall the Intel Corporation or contributors be liable for any direct,
// indirect, incidental, special, exemplary, or consequential damages
// (including, but not limited to, procurement of substitute goods or services;
// loss of use, data, or profits; or business interruption) however caused
// and on any theory of liability, whether in contract, strict liability,
// or tort (including negligence or otherwise) arising in any way out of
// the use of this software, even if advised of the possibility of such damage.
//
//M*/
#include "precomp.hpp"
#ifdef HAVE_PTHREADS_PF
#include <algorithm>
#include <pthread.h>
namespace cv
{
class ThreadManager;
enum ForThreadState
{
eFTNotStarted = 0,
eFTStarted = 1,
eFTToStop = 2,
eFTStoped = 3
};
enum ThreadManagerPoolState
{
eTMNotInited = 0,
eTMFailedToInit = 1,
eTMInited = 2,
eTMSingleThreaded = 3
};
struct work_load
{
work_load()
{
clear();
}
work_load(const cv::Range& range, const cv::ParallelLoopBody& body, int nstripes)
{
set(range, body, nstripes);
}
void set(const cv::Range& range, const cv::ParallelLoopBody& body, unsigned int nstripes)
{
m_body = &body;
m_range = &range;
//ensure that nstripes not larger than range length
m_nstripes = std::min( unsigned(m_range->end - m_range->start) , nstripes);
m_block_size = ((m_range->end - m_range->start - 1)/m_nstripes) + 1;
//ensure that nstripes not larger than blocks count, so we would never go out of range
m_nstripes = std::min(m_nstripes, unsigned(((m_range->end - m_range->start - 1)/m_block_size) + 1) );
}
const cv::ParallelLoopBody* m_body;
const cv::Range* m_range;
unsigned int m_nstripes;
int m_block_size;
void clear()
{
m_body = 0;
m_range = 0;
m_nstripes = 0;
m_block_size = 0;
}
};
class ForThread
{
public:
ForThread(): m_posix_thread(0), m_task_start(false), m_parent(0), m_state(eFTNotStarted), m_id(0)
{
}
//called from manager thread
bool init(size_t id, ThreadManager* parent);
//called from manager thread
void run();
//called from manager thread
void stop();
~ForThread();
private:
//called from worker thread
static void* thread_loop_wrapper(void* thread_object);
//called from worker thread
void execute();
//called from worker thread
void thread_body();
pthread_t m_posix_thread;
pthread_mutex_t m_thread_mutex;
pthread_cond_t m_cond_thread_task;
volatile bool m_task_start;
ThreadManager* m_parent;
volatile ForThreadState m_state;
size_t m_id;
};
class ThreadManager
{
public:
friend class ForThread;
static ThreadManager& instance()
{
CV_SINGLETON_LAZY_INIT_REF(ThreadManager, new ThreadManager())
}
static void stop()
{
ThreadManager& manager = instance();
if(manager.m_pool_state == eTMInited)
{
for(size_t i = 0; i < manager.m_num_threads; ++i)
{
manager.m_threads[i].stop();
}
}
manager.m_pool_state = eTMNotInited;
}
void run(const cv::Range& range, const cv::ParallelLoopBody& body, double nstripes);
size_t getNumOfThreads();
void setNumOfThreads(size_t n);
private:
ThreadManager();
~ThreadManager();
void wait_complete();
void notify_complete();
bool initPool();
size_t defaultNumberOfThreads();
std::vector<ForThread> m_threads;
size_t m_num_threads;
pthread_mutex_t m_manager_task_mutex;
pthread_cond_t m_cond_thread_task_complete;
bool m_task_complete;
unsigned int m_task_position;
unsigned int m_num_of_completed_tasks;
pthread_mutex_t m_manager_access_mutex;
static const char m_env_name[];
work_load m_work_load;
struct work_thread_t
{
work_thread_t(): value(false) { }
bool value;
};
cv::TLSData<work_thread_t> m_is_work_thread;
ThreadManagerPoolState m_pool_state;
};
const char ThreadManager::m_env_name[] = "OPENCV_FOR_THREADS_NUM";
ForThread::~ForThread()
{
if(m_state == eFTStarted)
{
stop();
pthread_mutex_destroy(&m_thread_mutex);
pthread_cond_destroy(&m_cond_thread_task);
}
}
bool ForThread::init(size_t id, ThreadManager* parent)
{
m_id = id;
m_parent = parent;
int res = 0;
res |= pthread_mutex_init(&m_thread_mutex, NULL);
res |= pthread_cond_init(&m_cond_thread_task, NULL);
if(!res)
{
res = pthread_create(&m_posix_thread, NULL, thread_loop_wrapper, (void*)this);
}
return res == 0;
}
void ForThread::stop()
{
if(m_state == eFTStarted)
{
pthread_mutex_lock(&m_thread_mutex);
m_state = eFTToStop;
pthread_mutex_unlock(&m_thread_mutex);
run();
pthread_join(m_posix_thread, NULL);
}
pthread_mutex_lock(&m_thread_mutex);
m_state = eFTStoped;
pthread_mutex_unlock(&m_thread_mutex);
}
void ForThread::run()
{
pthread_mutex_lock(&m_thread_mutex);
m_task_start = true;
pthread_cond_signal(&m_cond_thread_task);
pthread_mutex_unlock(&m_thread_mutex);
}
void* ForThread::thread_loop_wrapper(void* thread_object)
{
((ForThread*)thread_object)->thread_body();
return 0;
}
void ForThread::execute()
{
unsigned int m_current_pos = CV_XADD(&m_parent->m_task_position, 1);
work_load& load = m_parent->m_work_load;
while(m_current_pos < load.m_nstripes)
{
int start = load.m_range->start + m_current_pos*load.m_block_size;
int end = std::min(start + load.m_block_size, load.m_range->end);
load.m_body->operator()(cv::Range(start, end));
m_current_pos = CV_XADD(&m_parent->m_task_position, 1);
}
}
void ForThread::thread_body()
{
(void)cv::utils::getThreadID(); // notify OpenCV about new thread
m_parent->m_is_work_thread.get()->value = true;
pthread_mutex_lock(&m_thread_mutex);
m_state = eFTStarted;
while(m_state == eFTStarted)
{
//to handle spurious wakeups
while( !m_task_start && m_state != eFTToStop )
pthread_cond_wait(&m_cond_thread_task, &m_thread_mutex);
if(m_state == eFTStarted)
{
execute();
m_task_start = false;
m_parent->notify_complete();
}
}
pthread_mutex_unlock(&m_thread_mutex);
}
ThreadManager::ThreadManager(): m_num_threads(0), m_task_complete(false), m_num_of_completed_tasks(0), m_pool_state(eTMNotInited)
{
int res = 0;
pthread_mutexattr_t attr;
pthread_mutexattr_init(&attr);
pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE);
res |= pthread_mutex_init(&m_manager_access_mutex, &attr);
pthread_mutexattr_destroy(&attr);
res |= pthread_mutex_init(&m_manager_task_mutex, NULL);
res |= pthread_cond_init(&m_cond_thread_task_complete, NULL);
if(!res)
{
setNumOfThreads(defaultNumberOfThreads());
m_task_position = 0;
}
else
{
m_num_threads = 1;
m_pool_state = eTMFailedToInit;
m_task_position = 0;
//print error;
}
}
ThreadManager::~ThreadManager()
{
stop();
pthread_mutex_destroy(&m_manager_task_mutex);
pthread_cond_destroy(&m_cond_thread_task_complete);
pthread_mutex_destroy(&m_manager_access_mutex);
}
void ThreadManager::run(const cv::Range& range, const cv::ParallelLoopBody& body, double nstripes)
{
bool is_work_thread = m_is_work_thread.get()->value;
if( (getNumOfThreads() > 1) && !is_work_thread &&
(range.end - range.start > 1) && (nstripes <= 0 || nstripes >= 1.5) )
{
int res = pthread_mutex_trylock(&m_manager_access_mutex);
if(!res)
{
if(initPool())
{
if(nstripes < 1) nstripes = 4*m_threads.size();
double max_stripes = 4*m_threads.size();
nstripes = std::min(nstripes, max_stripes);
pthread_mutex_lock(&m_manager_task_mutex);
m_num_of_completed_tasks = 0;
m_task_position = 0;
m_task_complete = false;
m_work_load.set(range, body, cvCeil(nstripes));
for(size_t i = 0; i < m_threads.size(); ++i)
{
m_threads[i].run();
}
wait_complete();
}
else
{
//print error
body(range);
}
}
else
{
body(range);
}
}
else
{
body(range);
}
}
void ThreadManager::wait_complete()
{
//to handle spurious wakeups
while(!m_task_complete)
pthread_cond_wait(&m_cond_thread_task_complete, &m_manager_task_mutex);
pthread_mutex_unlock(&m_manager_task_mutex);
pthread_mutex_unlock(&m_manager_access_mutex);
}
void ThreadManager::notify_complete()
{
unsigned int comp = CV_XADD(&m_num_of_completed_tasks, 1);
if(comp == (m_num_threads - 1))
{
pthread_mutex_lock(&m_manager_task_mutex);
m_task_complete = true;
pthread_cond_signal(&m_cond_thread_task_complete);
pthread_mutex_unlock(&m_manager_task_mutex);
}
}
bool ThreadManager::initPool()
{
if(m_pool_state != eTMNotInited || m_num_threads == 1)
return true;
m_threads.resize(m_num_threads);
bool res = true;
for(size_t i = 0; i < m_threads.size(); ++i)
{
res |= m_threads[i].init(i, this);
}
if(res)
{
m_pool_state = eTMInited;
}
else
{
//TODO: join threads?
m_pool_state = eTMFailedToInit;
}
return res;
}
size_t ThreadManager::getNumOfThreads()
{
return m_num_threads;
}
void ThreadManager::setNumOfThreads(size_t n)
{
int res = pthread_mutex_lock(&m_manager_access_mutex);
if(!res)
{
if(n == 0)
{
n = defaultNumberOfThreads();
}
if(n != m_num_threads && m_pool_state != eTMFailedToInit)
{
if(m_pool_state == eTMInited)
{
stop();
m_threads.clear();
}
m_num_threads = n;
if(m_num_threads == 1)
{
m_pool_state = eTMSingleThreaded;
}
else
{
m_pool_state = eTMNotInited;
}
}
pthread_mutex_unlock(&m_manager_access_mutex);
}
}
size_t ThreadManager::defaultNumberOfThreads()
{
#ifdef __ANDROID__
// many modern phones/tables have 4-core CPUs. Let's use no more
// than 2 threads by default not to overheat the devices
const unsigned int default_number_of_threads = 2;
#else
const unsigned int default_number_of_threads = (unsigned int)std::max(1, cv::getNumberOfCPUs());
#endif
unsigned int result = default_number_of_threads;
char * env = getenv(m_env_name);
if(env != NULL)
{
sscanf(env, "%u", &result);
result = std::max(1u, result);
//do we need upper limit of threads number?
}
return result;
}
void parallel_for_pthreads(const cv::Range& range, const cv::ParallelLoopBody& body, double nstripes);
size_t parallel_pthreads_get_threads_num();
void parallel_pthreads_set_threads_num(int num);
size_t parallel_pthreads_get_threads_num()
{
return ThreadManager::instance().getNumOfThreads();
}
void parallel_pthreads_set_threads_num(int num)
{
if(num < 0)
{
ThreadManager::instance().setNumOfThreads(0);
}
else
{
ThreadManager::instance().setNumOfThreads(size_t(num));
}
}
void parallel_for_pthreads(const cv::Range& range, const cv::ParallelLoopBody& body, double nstripes)
{
ThreadManager::instance().run(range, body, nstripes);
}
}
#endif

@ -1693,9 +1693,10 @@ void TestBase::validateMetrics()
{ {
double mean = metrics.mean * 1000.0f / metrics.frequency; double mean = metrics.mean * 1000.0f / metrics.frequency;
double median = metrics.median * 1000.0f / metrics.frequency; double median = metrics.median * 1000.0f / metrics.frequency;
double min_value = metrics.min * 1000.0f / metrics.frequency;
double stddev = metrics.stddev * 1000.0f / metrics.frequency; double stddev = metrics.stddev * 1000.0f / metrics.frequency;
double percents = stddev / mean * 100.f; double percents = stddev / mean * 100.f;
printf("[ PERFSTAT ] (samples = %d, mean = %.2f, median = %.2f, stddev = %.2f (%.1f%%))\n", (int)metrics.samples, mean, median, stddev, percents); printf("[ PERFSTAT ] (samples=%d mean=%.2f median=%.2f min=%.2f stddev=%.2f (%.1f%%))\n", (int)metrics.samples, mean, median, min_value, stddev, percents);
} }
else else
{ {

Loading…
Cancel
Save