From 47998c03f8008d4b5c140e51aa20053d0449a126 Mon Sep 17 00:00:00 2001 From: Alexander Alekhin Date: Wed, 24 Jan 2018 13:40:56 +0300 Subject: [PATCH 1/2] perf: update message --- modules/ts/src/ts_perf.cpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/modules/ts/src/ts_perf.cpp b/modules/ts/src/ts_perf.cpp index 05ad22e691..259c289d67 100644 --- a/modules/ts/src/ts_perf.cpp +++ b/modules/ts/src/ts_perf.cpp @@ -1693,9 +1693,10 @@ void TestBase::validateMetrics() { double mean = metrics.mean * 1000.0f / metrics.frequency; double median = metrics.median * 1000.0f / metrics.frequency; + double min_value = metrics.min * 1000.0f / metrics.frequency; double stddev = metrics.stddev * 1000.0f / metrics.frequency; double percents = stddev / mean * 100.f; - printf("[ PERFSTAT ] (samples = %d, mean = %.2f, median = %.2f, stddev = %.2f (%.1f%%))\n", (int)metrics.samples, mean, median, stddev, percents); + printf("[ PERFSTAT ] (samples=%d mean=%.2f median=%.2f min=%.2f stddev=%.2f (%.1f%%))\n", (int)metrics.samples, mean, median, min_value, stddev, percents); } else { From c49d5d5252dd13a706cd2f01e6afc3dac083bdbd Mon Sep 17 00:00:00 2001 From: Alexander Alekhin Date: Fri, 19 Jan 2018 17:26:29 +0300 Subject: [PATCH 2/2] core: fix pthreads performance OpenCV pthreads-based implementation changes: - rework worker threads pool, allow to execute job by the main thread too - rework synchronization scheme (wait for job completion, threads 'pong' answer is not required) - allow "active wait" (spin) by worker threads and by the main thread - use _mm_pause() during active wait (support for Hyper-Threading technology) - use sched_yield() to avoid preemption of still working other workers - don't use getTickCount() - optional builtin thread pool profiler (disabled by compilation flag) --- modules/core/include/opencv2/core/utility.hpp | 2 + modules/core/src/parallel.cpp | 38 +- modules/core/src/parallel_impl.cpp | 765 ++++++++++++++++++ modules/core/src/parallel_impl.hpp | 17 + modules/core/src/parallel_pthreads.cpp | 581 ------------- 5 files changed, 814 insertions(+), 589 deletions(-) create mode 100644 modules/core/src/parallel_impl.cpp create mode 100644 modules/core/src/parallel_impl.hpp delete mode 100644 modules/core/src/parallel_pthreads.cpp diff --git a/modules/core/include/opencv2/core/utility.hpp b/modules/core/include/opencv2/core/utility.hpp index 69d114e156..8845843719 100644 --- a/modules/core/include/opencv2/core/utility.hpp +++ b/modules/core/include/opencv2/core/utility.hpp @@ -233,6 +233,8 @@ CV_EXPORTS_W int getNumThreads(); /** @brief Returns the index of the currently executed thread within the current parallel region. Always returns 0 if called outside of parallel region. +@deprecated Current implementation doesn't corresponding to this documentation. + The exact meaning of the return value depends on the threading framework used by OpenCV library: - `TBB` - Unsupported with current 4.1 TBB release. Maybe will be supported in future. - `OpenMP` - The thread number, within the current team, of the calling thread. diff --git a/modules/core/src/parallel.cpp b/modules/core/src/parallel.cpp index 8ead5162de..b4dbc6a2b5 100644 --- a/modules/core/src/parallel.cpp +++ b/modules/core/src/parallel.cpp @@ -42,6 +42,7 @@ #include "precomp.hpp" +#include #include #if defined _WIN32 || defined WINCE @@ -125,19 +126,15 @@ # define CV_PARALLEL_FRAMEWORK "pthreads" #endif +#include "parallel_impl.hpp" + using namespace cv; namespace cv { ParallelLoopBody::~ParallelLoopBody() {} -#ifdef HAVE_PTHREADS_PF - void parallel_for_pthreads(const cv::Range& range, const cv::ParallelLoopBody& body, double nstripes); - size_t parallel_pthreads_get_threads_num(); - void parallel_pthreads_set_threads_num(int num); -#endif } - namespace { #ifdef CV_PARALLEL_FRAMEWORK @@ -536,10 +533,35 @@ int cv::getNumThreads(void) #endif } -void cv::setNumThreads( int threads ) +namespace cv { +unsigned defaultNumberOfThreads() +{ +#ifdef __ANDROID__ + // many modern phones/tables have 4-core CPUs. Let's use no more + // than 2 threads by default not to overheat the devices + const unsigned int default_number_of_threads = 2; +#else + const unsigned int default_number_of_threads = (unsigned int)std::max(1, cv::getNumberOfCPUs()); +#endif + + unsigned result = default_number_of_threads; + + static int config_num_threads = (int)utils::getConfigurationParameterSizeT("OPENCV_FOR_THREADS_NUM", 0); + + if (config_num_threads) + { + result = (unsigned)std::max(1, config_num_threads); + //do we need upper limit of threads number? + } + return result; +} +} + +void cv::setNumThreads( int threads_ ) { - (void)threads; + (void)threads_; #ifdef CV_PARALLEL_FRAMEWORK + int threads = (threads_ < 0) ? defaultNumberOfThreads() : (unsigned)threads_; numThreads = threads; #endif diff --git a/modules/core/src/parallel_impl.cpp b/modules/core/src/parallel_impl.cpp new file mode 100644 index 0000000000..8e907905e8 --- /dev/null +++ b/modules/core/src/parallel_impl.cpp @@ -0,0 +1,765 @@ +// This file is part of OpenCV project. +// It is subject to the license terms in the LICENSE file found in the top-level directory +// of this distribution and at http://opencv.org/license.html. + +#include "precomp.hpp" + +#include "parallel_impl.hpp" + +#ifdef HAVE_PTHREADS_PF +#include + +#include + +#include +//#undef CV_LOG_STRIP_LEVEL +//#define CV_LOG_STRIP_LEVEL CV_LOG_LEVEL_VERBOSE + 1 +#include + +//#define CV_PROFILE_THREADS 64 +//#define getTickCount getCPUTickCount // use this if getTickCount() calls are expensive (and getCPUTickCount() is accurate) + +//#define CV_USE_GLOBAL_WORKERS_COND_VAR // not effective on many-core systems (10+) + +#ifdef CV_CXX11 +#include +#else +#include // _POSIX_PRIORITY_SCHEDULING +#endif + +// Spin lock's OS-level yield +#ifdef DECLARE_CV_YIELD +DECLARE_CV_YIELD +#endif +#ifndef CV_YIELD +# ifdef CV_CXX11 +# include +# define CV_YIELD() std::this_thread::yield() +# elif defined(_POSIX_PRIORITY_SCHEDULING) +# include +# define CV_YIELD() sched_yield() +# else +# warning "Can't detect sched_yield() on the target platform. Specify CV_YIELD() definition via compiler flags." +# define CV_YIELD() /* no-op: works, but not effective */ +# endif +#endif // CV_YIELD + +// Spin lock's CPU-level yield (required for Hyper-Threading) +#ifdef DECLARE_CV_PAUSE +DECLARE_CV_PAUSE +#endif +#ifndef CV_PAUSE +#if defined __GNUC__ && (defined __i386__ || defined __x86_64__) +# define CV_PAUSE(v) do { for (int __delay = (v); __delay > 0; --__delay) { _mm_pause(); } } while (0) +# elif defined __GNUC__ && defined __aarch64__ +# define CV_PAUSE(v) do { for (int __delay = (v); __delay > 0; --__delay) { asm volatile("yield" ::: "memory"); } } while (0) +# elif defined __GNUC__ && defined __arm__ +# define CV_PAUSE(v) do { for (int __delay = (v); __delay > 0; --__delay) { asm volatile("" ::: "memory"); } } while (0) +# else +# warning "Can't detect 'pause' (CPU-yield) instruction on the target platform. Specify CV_PAUSE() definition via compiler flags." +# define CV_PAUSE(...) do { /* no-op: works, but not effective */ } while (0) +# endif +#endif // CV_PAUSE + + +namespace cv +{ + +static int CV_ACTIVE_WAIT_PAUSE_LIMIT = (int)utils::getConfigurationParameterSizeT("OPENCV_THREAD_POOL_ACTIVE_WAIT_PAUSE_LIMIT", 16); // iterations +static int CV_WORKER_ACTIVE_WAIT = (int)utils::getConfigurationParameterSizeT("OPENCV_THREAD_POOL_ACTIVE_WAIT_WORKER", 2000); // iterations +static int CV_MAIN_THREAD_ACTIVE_WAIT = (int)utils::getConfigurationParameterSizeT("OPENCV_THREAD_POOL_ACTIVE_WAIT_MAIN", 10000); // iterations + +static int CV_WORKER_ACTIVE_WAIT_THREADS_LIMIT = (int)utils::getConfigurationParameterSizeT("OPENCV_THREAD_POOL_ACTIVE_WAIT_THREADS_LIMIT", 0); // number of real cores + +class WorkerThread; +class ParallelJob; + +class ThreadPool +{ +public: + static ThreadPool& instance() + { + CV_SINGLETON_LAZY_INIT_REF(ThreadPool, new ThreadPool()) + } + + static void stop() + { + ThreadPool& manager = instance(); + manager.reconfigure(0); + } + + void reconfigure(unsigned new_threads_count) + { + if (new_threads_count == threads.size()) + return; + pthread_mutex_lock(&mutex); + reconfigure_(new_threads_count); + pthread_mutex_unlock(&mutex); + } + bool reconfigure_(unsigned new_threads_count); // internal implementation + + void run(const Range& range, const ParallelLoopBody& body, double nstripes); + + size_t getNumOfThreads(); + + void setNumOfThreads(unsigned n); + + ThreadPool(); + + ~ThreadPool(); + + unsigned num_threads; + + pthread_mutex_t mutex; // guards fields (job/threads) from non-worker threads (concurrent parallel_for calls) +#if defined(CV_USE_GLOBAL_WORKERS_COND_VAR) + pthread_cond_t cond_thread_wake; +#endif + + pthread_mutex_t mutex_notify; + pthread_cond_t cond_thread_task_complete; + + std::vector< Ptr > threads; + + Ptr job; + +#ifdef CV_PROFILE_THREADS + double tickFreq; + int64 jobSubmitTime; + struct ThreadStatistics + { + ThreadStatistics() : threadWait(0) + { + reset(); + } + void reset() + { + threadWake = 0; + threadExecuteStart = 0; + threadExecuteStop = 0; + executedTasks = 0; + keepActive = false; + threadPing = getTickCount(); + } + int64 threadWait; // don't reset by default + int64 threadPing; // don't reset by default + int64 threadWake; + int64 threadExecuteStart; + int64 threadExecuteStop; + int64 threadFree; + unsigned executedTasks; + bool keepActive; + + int64 dummy_[8]; // separate cache lines + + void dump(int id, int64 baseTime, double tickFreq) + { + if (id < 0) + std::cout << "Main: "; + else + printf("T%03d: ", id + 2); + printf("wait=% 10.1f ping=% 6.1f", + threadWait > 0 ? (threadWait - baseTime) / tickFreq * 1e6 : -0.0, + threadPing > 0 ? (threadPing - baseTime) / tickFreq * 1e6 : -0.0); + if (threadWake > 0) + printf(" wake=% 6.1f", + (threadWake > 0 ? (threadWake - baseTime) / tickFreq * 1e6 : -0.0)); + if (threadExecuteStart > 0) + { + printf(" exec=% 6.1f - % 6.1f tasksDone=%5u free=% 6.1f", + (threadExecuteStart > 0 ? (threadExecuteStart - baseTime) / tickFreq * 1e6 : -0.0), + (threadExecuteStop > 0 ? (threadExecuteStop - baseTime) / tickFreq * 1e6 : -0.0), + executedTasks, + (threadFree > 0 ? (threadFree - baseTime) / tickFreq * 1e6 : -0.0)); + if (id >= 0) + printf(" active=%s\n", keepActive ? "true" : "false"); + else + printf("\n"); + } + else + printf(" ------------------------------------------------------------------------------\n"); + } + }; + ThreadStatistics threads_stat[CV_PROFILE_THREADS]; // 0 - main thread, 1..N - worker threads +#endif + +}; + +class WorkerThread +{ +public: + ThreadPool& thread_pool; + unsigned id; + pthread_t posix_thread; + bool is_created; + + volatile bool stop_thread; + + volatile bool has_wake_signal; + volatile bool dont_wait; + + Ptr job; + + pthread_mutex_t mutex; +#if !defined(CV_USE_GLOBAL_WORKERS_COND_VAR) + volatile bool isActive; + pthread_cond_t cond_thread_wake; +#endif + + WorkerThread(ThreadPool& thread_pool_, unsigned id_) : + thread_pool(thread_pool_), + id(id_), + posix_thread(0), + is_created(false), + stop_thread(false), + has_wake_signal(false), + dont_wait(false) +#if !defined(CV_USE_GLOBAL_WORKERS_COND_VAR) + , isActive(true) +#endif + { + CV_LOG_VERBOSE(NULL, 1, "MainThread: initializing new worker: " << id); + int res = pthread_mutex_init(&mutex, NULL); + if (res != 0) + { + CV_LOG_ERROR(NULL, id << ": Can't create thread mutex: res = " << res); + return; + } +#if !defined(CV_USE_GLOBAL_WORKERS_COND_VAR) + res = pthread_cond_init(&cond_thread_wake, NULL); + if (res != 0) + { + CV_LOG_ERROR(NULL, id << ": Can't create thread condition variable: res = " << res); + return; + } +#endif + res = pthread_create(&posix_thread, NULL, thread_loop_wrapper, (void*)this); + if (res != 0) + { + CV_LOG_ERROR(NULL, id << ": Can't spawn new thread: res = " << res); + } + else + { + is_created = true; + } + } + + ~WorkerThread() + { + CV_LOG_VERBOSE(NULL, 1, "MainThread: destroy worker thread: " << id); + if (is_created) + { + if (!stop_thread) + { + pthread_mutex_lock(&mutex); // to avoid signal miss due pre-check + stop_thread = true; + pthread_mutex_unlock(&mutex); +#if defined(CV_USE_GLOBAL_WORKERS_COND_VAR) + pthread_cond_broadcast(&thread_pool.cond_thread_wake); +#else + pthread_cond_signal(&cond_thread_wake); +#endif + } + pthread_join(posix_thread, NULL); + } +#if !defined(CV_USE_GLOBAL_WORKERS_COND_VAR) + pthread_cond_destroy(&cond_thread_wake); +#endif + pthread_mutex_destroy(&mutex); + } + + void thread_body(); + static void* thread_loop_wrapper(void* thread_object) + { + ((WorkerThread*)thread_object)->thread_body(); + return 0; + } +}; + +class ParallelJob +{ +public: + ParallelJob(const ThreadPool& thread_pool_, const Range& range_, const ParallelLoopBody& body_, int nstripes_) : + thread_pool(thread_pool_), + body(body_), + range(range_), + nstripes((unsigned)nstripes_), + is_completed(false) + { + CV_LOG_VERBOSE(NULL, 5, "ParallelJob::ParallelJob(" << (void*)this << ")"); +#ifdef CV_CXX11 + current_task.store(0, std::memory_order_relaxed); + active_thread_count.store(0, std::memory_order_relaxed); + completed_thread_count.store(0, std::memory_order_relaxed); +#else + current_task = 0; + active_thread_count = 0; + completed_thread_count = 0; +#endif + dummy0_[0] = 0, dummy1_[0] = 0, dummy2_[0] = 0; // compiler warning + } + + ~ParallelJob() + { + CV_LOG_VERBOSE(NULL, 5, "ParallelJob::~ParallelJob(" << (void*)this << ")"); + } + + unsigned execute(bool is_worker_thread) + { + unsigned executed_tasks = 0; + const int task_count = range.size(); + const int remaining_multiplier = std::min(nstripes, + std::max( + std::min(100u, thread_pool.num_threads * 4), + thread_pool.num_threads * 2 + )); // experimental value + for (;;) + { + int chunk_size = std::max(1, (task_count - current_task) / remaining_multiplier); +#ifdef CV_CXX11 + int id = current_task.fetch_add(chunk_size, std::memory_order_seq_cst); +#else + int id = (int)CV_XADD(¤t_task, chunk_size); +#endif + if (id >= task_count) + break; // no more free tasks + + executed_tasks += chunk_size; + int start_id = id; + int end_id = std::min(task_count, id + chunk_size); + CV_LOG_VERBOSE(NULL, 9, "Thread: job " << start_id << "-" << end_id); + + //TODO: if (not pending exception) + { + body.operator()(Range(range.start + start_id, range.start + end_id)); + } + if (is_worker_thread && is_completed) + { + CV_LOG_ERROR(NULL, "\t\t\t\tBUG! Job: " << (void*)this << " " << id << " " << active_thread_count << " " << completed_thread_count); + CV_Assert(!is_completed); // TODO Dbg this + } + } + return executed_tasks; + } + + const ThreadPool& thread_pool; + const ParallelLoopBody& body; + const Range range; + const unsigned nstripes; +#ifdef CV_CXX11 + std::atomic current_task; // next free part of job + int64 dummy0_[8]; // avoid cache-line reusing for the same atomics + + std::atomic active_thread_count; // number of threads worked on this job + int64 dummy1_[8]; // avoid cache-line reusing for the same atomics + + std::atomic completed_thread_count; // number of threads completed any activities on this job + int64 dummy2_[8]; // avoid cache-line reusing for the same atomics +#else + /*CV_DECL_ALIGNED(64)*/ volatile int current_task; // next free part of job + int64 dummy0_[8]; // avoid cache-line reusing for the same atomics + + /*CV_DECL_ALIGNED(64)*/ volatile int active_thread_count; // number of threads worked on this job + int64 dummy1_[8]; // avoid cache-line reusing for the same atomics + + /*CV_DECL_ALIGNED(64)*/ volatile int completed_thread_count; // number of threads completed any activities on this job + int64 dummy2_[8]; // avoid cache-line reusing for the same atomics +#endif + + volatile bool is_completed; // std::atomic_flag ? + + // TODO exception handling +}; + + +void WorkerThread::thread_body() +{ + (void)cv::utils::getThreadID(); // notify OpenCV about new thread + CV_LOG_VERBOSE(NULL, 5, "Thread: new thread: " << id); + + bool allow_active_wait = true; + +#ifdef CV_PROFILE_THREADS + ThreadPool::ThreadStatistics& stat = thread_pool.threads_stat[id + 1]; +#endif + + while (!stop_thread) + { + CV_LOG_VERBOSE(NULL, 5, "Thread: ... loop iteration: allow_active_wait=" << allow_active_wait << " has_wake_signal=" << has_wake_signal << " dont_wait=" << dont_wait ); + if (allow_active_wait && CV_WORKER_ACTIVE_WAIT > 0) + { + allow_active_wait = false; + for (int i = 0; i < CV_WORKER_ACTIVE_WAIT; i++) + { + if (has_wake_signal) + break; + if (CV_ACTIVE_WAIT_PAUSE_LIMIT > 0 && (i < CV_ACTIVE_WAIT_PAUSE_LIMIT || (i & 1))) + CV_PAUSE(16); + else + CV_YIELD(); + } + } + pthread_mutex_lock(&mutex); +#ifdef CV_PROFILE_THREADS + stat.threadWait = getTickCount(); +#endif + while (!has_wake_signal && !dont_wait) // to handle spurious wakeups + { + //CV_LOG_VERBOSE(NULL, 5, "Thread: wait (sleep) ..."); +#if defined(CV_USE_GLOBAL_WORKERS_COND_VAR) + pthread_cond_wait(&thread_pool.cond_thread_wake, &mutex); +#else + isActive = false; + pthread_cond_wait(&cond_thread_wake, &mutex); + isActive = true; +#endif + CV_LOG_VERBOSE(NULL, 5, "Thread: wake ... (has_wake_signal=" << has_wake_signal << " stop_thread=" << stop_thread << ")") + } + dont_wait = false; +#ifdef CV_PROFILE_THREADS + stat.threadWake = getTickCount(); +#endif + + if (!stop_thread) + { + CV_LOG_VERBOSE(NULL, 5, "Thread: checking for new job"); + if (CV_WORKER_ACTIVE_WAIT_THREADS_LIMIT == 0) + allow_active_wait = true; + Ptr j_ptr; swap(j_ptr, job); + has_wake_signal = false; + pthread_mutex_unlock(&mutex); + ParallelJob* j = j_ptr; + if (j) + { + CV_LOG_VERBOSE(NULL, 5, "Thread: job size=" << j->range.size() << " done=" << j->current_task); + if (j->current_task < j->range.size()) + { +#ifdef CV_CXX11 + int other = j->active_thread_count.fetch_add(1, std::memory_order_seq_cst); +#else + int other = CV_XADD(&j->active_thread_count, 1); +#endif + CV_LOG_VERBOSE(NULL, 5, "Thread: processing new job (with " << other << " other threads)"); CV_UNUSED(other); +#ifdef CV_PROFILE_THREADS + stat.threadExecuteStart = getTickCount(); + stat.executedTasks = j->execute(true); + stat.threadExecuteStop = getTickCount(); +#else + j->execute(true); +#endif +#ifdef CV_CXX11 + int completed = j->completed_thread_count.fetch_add(1, std::memory_order_seq_cst) + 1; + int active = j->active_thread_count.load(std::memory_order_acquire); +#else + int completed = (int)CV_XADD(&j->completed_thread_count, 1) + 1; + int active = j->active_thread_count; +#endif + if (CV_WORKER_ACTIVE_WAIT_THREADS_LIMIT > 0) + { + allow_active_wait = true; + if (active >= CV_WORKER_ACTIVE_WAIT_THREADS_LIMIT && (id & 1) == 0) // turn off a half of threads + allow_active_wait = false; + } + CV_LOG_VERBOSE(NULL, 5, "Thread: completed job processing: " << active << " " << completed); + if (active == completed) + { + bool need_signal = !j->is_completed; + j->is_completed = true; + j = NULL; j_ptr.release(); + if (need_signal) + { + CV_LOG_VERBOSE(NULL, 5, "Thread: job finished => notifying the main thread"); + pthread_mutex_lock(&thread_pool.mutex_notify); // to avoid signal miss due pre-check condition + // empty + pthread_mutex_unlock(&thread_pool.mutex_notify); + pthread_cond_broadcast/*pthread_cond_signal*/(&thread_pool.cond_thread_task_complete); + } + } + } + else + { + has_wake_signal = false; + CV_LOG_VERBOSE(NULL, 5, "Thread: no free job tasks"); + } + } + } + else + { + pthread_mutex_unlock(&mutex); + } +#ifdef CV_PROFILE_THREADS + stat.threadFree = getTickCount(); + stat.keepActive = allow_active_wait; +#endif + } +} + +ThreadPool::ThreadPool() +{ +#ifdef CV_PROFILE_THREADS + tickFreq = getTickFrequency(); +#endif + + int res = 0; + res |= pthread_mutex_init(&mutex, NULL); + res |= pthread_mutex_init(&mutex_notify, NULL); +#if defined(CV_USE_GLOBAL_WORKERS_COND_VAR) + res |= pthread_cond_init(&cond_thread_wake, NULL); +#endif + res |= pthread_cond_init(&cond_thread_task_complete, NULL); + + if (0 != res) + { + CV_LOG_FATAL(NULL, "Failed to initialize ThreadPool (pthreads)"); + } + num_threads = defaultNumberOfThreads(); +} + +bool ThreadPool::reconfigure_(unsigned new_threads_count) +{ + if (new_threads_count == threads.size()) + return false; + + if (new_threads_count < threads.size()) + { + CV_LOG_VERBOSE(NULL, 1, "MainThread: reduce worker pool: " << threads.size() << " => " << new_threads_count); + std::vector< Ptr > release_threads(threads.size() - new_threads_count); + for (size_t i = new_threads_count; i < threads.size(); ++i) + { + pthread_mutex_lock(&threads[i]->mutex); // to avoid signal miss due pre-check + threads[i]->stop_thread = true; + threads[i]->has_wake_signal = true; +#if !defined(CV_USE_GLOBAL_WORKERS_COND_VAR) + pthread_mutex_unlock(&threads[i]->mutex); + pthread_cond_broadcast/*pthread_cond_signal*/(&threads[i]->cond_thread_wake); // wake thread +#else + pthread_mutex_unlock(&threads[i]->mutex); +#endif + std::swap(threads[i], release_threads[i - new_threads_count]); + } +#if defined(CV_USE_GLOBAL_WORKERS_COND_VAR) + CV_LOG_VERBOSE(NULL, 1, "MainThread: notify worker threads about termination..."); + pthread_cond_broadcast(&cond_thread_wake); // wake all threads +#endif + threads.resize(new_threads_count); + release_threads.clear(); // calls thread_join which want to lock mutex + return false; + } + else + { + CV_LOG_VERBOSE(NULL, 1, "MainThread: upgrade worker pool: " << threads.size() << " => " << new_threads_count); + for (size_t i = threads.size(); i < new_threads_count; ++i) + { + threads.push_back(Ptr(new WorkerThread(*this, (unsigned)i))); // spawn more threads + } + } + return false; +} + +ThreadPool::~ThreadPool() +{ + reconfigure(0); + pthread_cond_destroy(&cond_thread_task_complete); +#if defined(CV_USE_GLOBAL_WORKERS_COND_VAR) + pthread_cond_destroy(&cond_thread_wake); +#endif + pthread_mutex_destroy(&mutex); + pthread_mutex_destroy(&mutex_notify); +} + +void ThreadPool::run(const Range& range, const ParallelLoopBody& body, double nstripes) +{ + CV_LOG_VERBOSE(NULL, 1, "MainThread: new parallel job: num_threads=" << num_threads << " range=" << range.size() << " nstripes=" << nstripes << " job=" << (void*)job); +#ifdef CV_PROFILE_THREADS + jobSubmitTime = getTickCount(); + threads_stat[0].reset(); + threads_stat[0].threadWait = jobSubmitTime; + threads_stat[0].threadWake = jobSubmitTime; +#endif + if (getNumOfThreads() > 1 && + job == NULL && + (range.size() * nstripes >= 2 || (range.size() > 1 && nstripes <= 0)) + ) + { + pthread_mutex_lock(&mutex); + if (job != NULL) + { + pthread_mutex_unlock(&mutex); + body(range); + return; + } + reconfigure_(num_threads - 1); + + { + CV_LOG_VERBOSE(NULL, 1, "MainThread: initialize parallel job: " << range.size()); + job = Ptr(new ParallelJob(*this, range, body, nstripes)); + pthread_mutex_unlock(&mutex); + + CV_LOG_VERBOSE(NULL, 5, "MainThread: wake worker threads..."); + for (size_t i = 0; i < threads.size(); ++i) + { +#if !defined(CV_USE_GLOBAL_WORKERS_COND_VAR) + bool isActive = threads[i]->isActive; + if (isActive || threads[i]->has_wake_signal) +#else + if (threads[i]->has_wake_signal) +#endif + { + pthread_mutex_lock(&threads[i]->mutex); + threads[i]->job = job; +#if !defined(CV_USE_GLOBAL_WORKERS_COND_VAR) + isActive = threads[i]->isActive; +#endif + threads[i]->dont_wait = true; +#ifdef CV_PROFILE_THREADS + threads_stat[i + 1].reset(); +#endif + pthread_mutex_unlock(&threads[i]->mutex); + threads[i]->has_wake_signal = true; +#if !defined(CV_USE_GLOBAL_WORKERS_COND_VAR) + if (!isActive) + { + pthread_cond_broadcast/*pthread_cond_signal*/(&threads[i]->cond_thread_wake); // wake thread + } +#endif + } + else + { + CV_Assert(threads[i]->job.empty()); + threads[i]->job = job; + threads[i]->dont_wait = true; + threads[i]->has_wake_signal = true; +#ifdef CV_PROFILE_THREADS + threads_stat[i + 1].reset(); +#endif +#if !defined(CV_USE_GLOBAL_WORKERS_COND_VAR) + pthread_cond_broadcast/*pthread_cond_signal*/(&threads[i]->cond_thread_wake); // wake thread +#endif + } + } +#ifdef CV_PROFILE_THREADS + threads_stat[0].threadPing = getTickCount(); +#endif +#if defined(CV_USE_GLOBAL_WORKERS_COND_VAR) + pthread_cond_broadcast(&cond_thread_wake); // wake all threads +#endif +#ifdef CV_PROFILE_THREADS + threads_stat[0].threadWake = getTickCount(); +#endif + CV_LOG_VERBOSE(NULL, 5, "MainThread: wake worker threads... (done)"); + + { + ParallelJob& j = *(this->job); +#ifdef CV_PROFILE_THREADS + threads_stat[0].threadExecuteStart = getTickCount(); + threads_stat[0].executedTasks = j.execute(false); + threads_stat[0].threadExecuteStop = getTickCount(); +#else + j.execute(false); +#endif + CV_Assert(j.current_task >= j.range.size()); + CV_LOG_VERBOSE(NULL, 5, "MainThread: complete self-tasks: " << j.active_thread_count << " " << j.completed_thread_count); + if (job->is_completed || j.active_thread_count == 0) + { + job->is_completed = true; + CV_LOG_VERBOSE(NULL, 5, "MainThread: no WIP worker threads"); + } + else + { + if (CV_MAIN_THREAD_ACTIVE_WAIT > 0) + { + for (int i = 0; i < CV_MAIN_THREAD_ACTIVE_WAIT; i++) // don't spin too much in any case (inaccurate getTickCount()) + { + if (job->is_completed) + { + CV_LOG_VERBOSE(NULL, 5, "MainThread: job finalize (active wait) " << j.active_thread_count << " " << j.completed_thread_count); + break; + } + if (CV_ACTIVE_WAIT_PAUSE_LIMIT > 0 && (i < CV_ACTIVE_WAIT_PAUSE_LIMIT || (i & 1))) + CV_PAUSE(16); + else + CV_YIELD(); + } + } + if (!job->is_completed) + { + CV_LOG_VERBOSE(NULL, 5, "MainThread: prepare wait " << j.active_thread_count << " " << j.completed_thread_count); + pthread_mutex_lock(&mutex_notify); + for (;;) + { + if (job->is_completed) + { + CV_LOG_VERBOSE(NULL, 5, "MainThread: job finalize (wait) " << j.active_thread_count << " " << j.completed_thread_count); + break; + } + CV_LOG_VERBOSE(NULL, 5, "MainThread: wait completion (sleep) ..."); + pthread_cond_wait(&cond_thread_task_complete, &mutex_notify); + CV_LOG_VERBOSE(NULL, 5, "MainThread: wake"); + } + pthread_mutex_unlock(&mutex_notify); + } + } + } +#ifdef CV_PROFILE_THREADS + threads_stat[0].threadFree = getTickCount(); + std::cout << "Job: sz=" << range.size() << " nstripes=" << nstripes << " Time: " << (threads_stat[0].threadFree - jobSubmitTime) / tickFreq * 1e6 << " usec" << std::endl; + for (int i = 0; i < (int)threads.size() + 1; i++) + { + threads_stat[i].dump(i - 1, jobSubmitTime, tickFreq); + } +#endif + if (job) + { + pthread_mutex_lock(&mutex); + CV_LOG_VERBOSE(NULL, 5, "MainThread: job release"); + CV_Assert(job->is_completed); + job.release(); + pthread_mutex_unlock(&mutex); + } + } + } + else + { + body(range); + } +} + +size_t ThreadPool::getNumOfThreads() +{ + return num_threads; +} + +void ThreadPool::setNumOfThreads(unsigned n) +{ + if (n != num_threads) + { + num_threads = n; + if (n == 1) + if (job == NULL) reconfigure(0); // stop worker threads immediatelly + } +} + +size_t parallel_pthreads_get_threads_num() +{ + return ThreadPool::instance().getNumOfThreads(); +} + +void parallel_pthreads_set_threads_num(int num) +{ + if(num < 0) + { + ThreadPool::instance().setNumOfThreads(0); + } + else + { + ThreadPool::instance().setNumOfThreads(unsigned(num)); + } +} + +void parallel_for_pthreads(const Range& range, const ParallelLoopBody& body, double nstripes) +{ + ThreadPool::instance().run(range, body, nstripes); +} + +} + +#endif diff --git a/modules/core/src/parallel_impl.hpp b/modules/core/src/parallel_impl.hpp new file mode 100644 index 0000000000..fc199a86b9 --- /dev/null +++ b/modules/core/src/parallel_impl.hpp @@ -0,0 +1,17 @@ +// This file is part of OpenCV project. +// It is subject to the license terms in the LICENSE file found in the top-level directory +// of this distribution and at http://opencv.org/license.html. +#ifndef OPENCV_CORE_PARALLEL_IMPL_HPP +#define OPENCV_CORE_PARALLEL_IMPL_HPP + +namespace cv { + +unsigned defaultNumberOfThreads(); + +void parallel_for_pthreads(const Range& range, const ParallelLoopBody& body, double nstripes); +size_t parallel_pthreads_get_threads_num(); +void parallel_pthreads_set_threads_num(int num); + +} + +#endif // OPENCV_CORE_PARALLEL_IMPL_HPP diff --git a/modules/core/src/parallel_pthreads.cpp b/modules/core/src/parallel_pthreads.cpp deleted file mode 100644 index b571ef08b3..0000000000 --- a/modules/core/src/parallel_pthreads.cpp +++ /dev/null @@ -1,581 +0,0 @@ -/*M/////////////////////////////////////////////////////////////////////////////////////// -// -// IMPORTANT: READ BEFORE DOWNLOADING, COPYING, INSTALLING OR USING. -// -// By downloading, copying, installing or using the software you agree to this license. -// If you do not agree to this license, do not download, install, -// copy or use the software. -// -// -// License Agreement -// For Open Source Computer Vision Library -// -// Copyright (C) 2000-2008, Intel Corporation, all rights reserved. -// Copyright (C) 2009-2011, Willow Garage Inc., all rights reserved. -// Third party copyrights are property of their respective owners. -// -// Redistribution and use in source and binary forms, with or without modification, -// are permitted provided that the following conditions are met: -// -// * Redistribution's of source code must retain the above copyright notice, -// this list of conditions and the following disclaimer. -// -// * Redistribution's in binary form must reproduce the above copyright notice, -// this list of conditions and the following disclaimer in the documentation -// and/or other materials provided with the distribution. -// -// * The name of the copyright holders may not be used to endorse or promote products -// derived from this software without specific prior written permission. -// -// This software is provided by the copyright holders and contributors "as is" and -// any express or implied warranties, including, but not limited to, the implied -// warranties of merchantability and fitness for a particular purpose are disclaimed. -// In no event shall the Intel Corporation or contributors be liable for any direct, -// indirect, incidental, special, exemplary, or consequential damages -// (including, but not limited to, procurement of substitute goods or services; -// loss of use, data, or profits; or business interruption) however caused -// and on any theory of liability, whether in contract, strict liability, -// or tort (including negligence or otherwise) arising in any way out of -// the use of this software, even if advised of the possibility of such damage. -// -//M*/ - -#include "precomp.hpp" - -#ifdef HAVE_PTHREADS_PF - -#include -#include - -namespace cv -{ - -class ThreadManager; - -enum ForThreadState -{ - eFTNotStarted = 0, - eFTStarted = 1, - eFTToStop = 2, - eFTStoped = 3 -}; - -enum ThreadManagerPoolState -{ - eTMNotInited = 0, - eTMFailedToInit = 1, - eTMInited = 2, - eTMSingleThreaded = 3 -}; - -struct work_load -{ - work_load() - { - clear(); - } - - work_load(const cv::Range& range, const cv::ParallelLoopBody& body, int nstripes) - { - set(range, body, nstripes); - } - - void set(const cv::Range& range, const cv::ParallelLoopBody& body, unsigned int nstripes) - { - m_body = &body; - m_range = ⦥ - - //ensure that nstripes not larger than range length - m_nstripes = std::min( unsigned(m_range->end - m_range->start) , nstripes); - - m_block_size = ((m_range->end - m_range->start - 1)/m_nstripes) + 1; - - //ensure that nstripes not larger than blocks count, so we would never go out of range - m_nstripes = std::min(m_nstripes, unsigned(((m_range->end - m_range->start - 1)/m_block_size) + 1) ); - } - - const cv::ParallelLoopBody* m_body; - const cv::Range* m_range; - unsigned int m_nstripes; - int m_block_size; - - void clear() - { - m_body = 0; - m_range = 0; - m_nstripes = 0; - m_block_size = 0; - } -}; - -class ForThread -{ -public: - - ForThread(): m_posix_thread(0), m_task_start(false), m_parent(0), m_state(eFTNotStarted), m_id(0) - { - } - - //called from manager thread - bool init(size_t id, ThreadManager* parent); - - //called from manager thread - void run(); - - //called from manager thread - void stop(); - - ~ForThread(); - -private: - - //called from worker thread - static void* thread_loop_wrapper(void* thread_object); - - //called from worker thread - void execute(); - - //called from worker thread - void thread_body(); - - pthread_t m_posix_thread; - pthread_mutex_t m_thread_mutex; - pthread_cond_t m_cond_thread_task; - volatile bool m_task_start; - - ThreadManager* m_parent; - volatile ForThreadState m_state; - size_t m_id; -}; - -class ThreadManager -{ -public: - friend class ForThread; - - static ThreadManager& instance() - { - CV_SINGLETON_LAZY_INIT_REF(ThreadManager, new ThreadManager()) - } - - static void stop() - { - ThreadManager& manager = instance(); - - if(manager.m_pool_state == eTMInited) - { - for(size_t i = 0; i < manager.m_num_threads; ++i) - { - manager.m_threads[i].stop(); - } - } - - manager.m_pool_state = eTMNotInited; - } - - void run(const cv::Range& range, const cv::ParallelLoopBody& body, double nstripes); - - size_t getNumOfThreads(); - - void setNumOfThreads(size_t n); - -private: - - ThreadManager(); - - ~ThreadManager(); - - void wait_complete(); - - void notify_complete(); - - bool initPool(); - - size_t defaultNumberOfThreads(); - - std::vector m_threads; - size_t m_num_threads; - - pthread_mutex_t m_manager_task_mutex; - pthread_cond_t m_cond_thread_task_complete; - bool m_task_complete; - - unsigned int m_task_position; - unsigned int m_num_of_completed_tasks; - - pthread_mutex_t m_manager_access_mutex; - - static const char m_env_name[]; - - work_load m_work_load; - - struct work_thread_t - { - work_thread_t(): value(false) { } - bool value; - }; - - cv::TLSData m_is_work_thread; - - ThreadManagerPoolState m_pool_state; -}; - -const char ThreadManager::m_env_name[] = "OPENCV_FOR_THREADS_NUM"; - -ForThread::~ForThread() -{ - if(m_state == eFTStarted) - { - stop(); - - pthread_mutex_destroy(&m_thread_mutex); - - pthread_cond_destroy(&m_cond_thread_task); - } -} - -bool ForThread::init(size_t id, ThreadManager* parent) -{ - m_id = id; - - m_parent = parent; - - int res = 0; - - res |= pthread_mutex_init(&m_thread_mutex, NULL); - - res |= pthread_cond_init(&m_cond_thread_task, NULL); - - if(!res) - { - res = pthread_create(&m_posix_thread, NULL, thread_loop_wrapper, (void*)this); - } - - - return res == 0; -} - -void ForThread::stop() -{ - if(m_state == eFTStarted) - { - pthread_mutex_lock(&m_thread_mutex); - m_state = eFTToStop; - pthread_mutex_unlock(&m_thread_mutex); - - run(); - - pthread_join(m_posix_thread, NULL); - } - - pthread_mutex_lock(&m_thread_mutex); - m_state = eFTStoped; - pthread_mutex_unlock(&m_thread_mutex); -} - -void ForThread::run() -{ - pthread_mutex_lock(&m_thread_mutex); - - m_task_start = true; - - pthread_cond_signal(&m_cond_thread_task); - - pthread_mutex_unlock(&m_thread_mutex); -} - -void* ForThread::thread_loop_wrapper(void* thread_object) -{ - ((ForThread*)thread_object)->thread_body(); - return 0; -} - -void ForThread::execute() -{ - unsigned int m_current_pos = CV_XADD(&m_parent->m_task_position, 1); - - work_load& load = m_parent->m_work_load; - - while(m_current_pos < load.m_nstripes) - { - int start = load.m_range->start + m_current_pos*load.m_block_size; - int end = std::min(start + load.m_block_size, load.m_range->end); - - load.m_body->operator()(cv::Range(start, end)); - - m_current_pos = CV_XADD(&m_parent->m_task_position, 1); - } -} - -void ForThread::thread_body() -{ - (void)cv::utils::getThreadID(); // notify OpenCV about new thread - - m_parent->m_is_work_thread.get()->value = true; - - pthread_mutex_lock(&m_thread_mutex); - - m_state = eFTStarted; - - while(m_state == eFTStarted) - { - //to handle spurious wakeups - while( !m_task_start && m_state != eFTToStop ) - pthread_cond_wait(&m_cond_thread_task, &m_thread_mutex); - - if(m_state == eFTStarted) - { - execute(); - - m_task_start = false; - - m_parent->notify_complete(); - } - } - - pthread_mutex_unlock(&m_thread_mutex); -} - -ThreadManager::ThreadManager(): m_num_threads(0), m_task_complete(false), m_num_of_completed_tasks(0), m_pool_state(eTMNotInited) -{ - int res = 0; - - pthread_mutexattr_t attr; - pthread_mutexattr_init(&attr); - pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE); - res |= pthread_mutex_init(&m_manager_access_mutex, &attr); - pthread_mutexattr_destroy(&attr); - - res |= pthread_mutex_init(&m_manager_task_mutex, NULL); - - res |= pthread_cond_init(&m_cond_thread_task_complete, NULL); - - if(!res) - { - setNumOfThreads(defaultNumberOfThreads()); - - m_task_position = 0; - } - else - { - m_num_threads = 1; - m_pool_state = eTMFailedToInit; - m_task_position = 0; - - //print error; - } -} - -ThreadManager::~ThreadManager() -{ - stop(); - - pthread_mutex_destroy(&m_manager_task_mutex); - - pthread_cond_destroy(&m_cond_thread_task_complete); - - pthread_mutex_destroy(&m_manager_access_mutex); -} - -void ThreadManager::run(const cv::Range& range, const cv::ParallelLoopBody& body, double nstripes) -{ - bool is_work_thread = m_is_work_thread.get()->value; - - if( (getNumOfThreads() > 1) && !is_work_thread && - (range.end - range.start > 1) && (nstripes <= 0 || nstripes >= 1.5) ) - { - int res = pthread_mutex_trylock(&m_manager_access_mutex); - - if(!res) - { - if(initPool()) - { - if(nstripes < 1) nstripes = 4*m_threads.size(); - - double max_stripes = 4*m_threads.size(); - - nstripes = std::min(nstripes, max_stripes); - - pthread_mutex_lock(&m_manager_task_mutex); - - m_num_of_completed_tasks = 0; - - m_task_position = 0; - - m_task_complete = false; - - m_work_load.set(range, body, cvCeil(nstripes)); - - for(size_t i = 0; i < m_threads.size(); ++i) - { - m_threads[i].run(); - } - - wait_complete(); - } - else - { - //print error - body(range); - } - } - else - { - body(range); - } - } - else - { - body(range); - } -} - -void ThreadManager::wait_complete() -{ - //to handle spurious wakeups - while(!m_task_complete) - pthread_cond_wait(&m_cond_thread_task_complete, &m_manager_task_mutex); - - pthread_mutex_unlock(&m_manager_task_mutex); - - pthread_mutex_unlock(&m_manager_access_mutex); -} - -void ThreadManager::notify_complete() -{ - - unsigned int comp = CV_XADD(&m_num_of_completed_tasks, 1); - - if(comp == (m_num_threads - 1)) - { - pthread_mutex_lock(&m_manager_task_mutex); - - m_task_complete = true; - - pthread_cond_signal(&m_cond_thread_task_complete); - - pthread_mutex_unlock(&m_manager_task_mutex); - } -} - -bool ThreadManager::initPool() -{ - if(m_pool_state != eTMNotInited || m_num_threads == 1) - return true; - - m_threads.resize(m_num_threads); - - bool res = true; - - for(size_t i = 0; i < m_threads.size(); ++i) - { - res |= m_threads[i].init(i, this); - } - - if(res) - { - m_pool_state = eTMInited; - } - else - { - //TODO: join threads? - m_pool_state = eTMFailedToInit; - } - - return res; -} - -size_t ThreadManager::getNumOfThreads() -{ - return m_num_threads; -} - -void ThreadManager::setNumOfThreads(size_t n) -{ - int res = pthread_mutex_lock(&m_manager_access_mutex); - - if(!res) - { - if(n == 0) - { - n = defaultNumberOfThreads(); - } - - if(n != m_num_threads && m_pool_state != eTMFailedToInit) - { - if(m_pool_state == eTMInited) - { - stop(); - m_threads.clear(); - } - - m_num_threads = n; - - if(m_num_threads == 1) - { - m_pool_state = eTMSingleThreaded; - } - else - { - m_pool_state = eTMNotInited; - } - } - - pthread_mutex_unlock(&m_manager_access_mutex); - } -} - -size_t ThreadManager::defaultNumberOfThreads() -{ -#ifdef __ANDROID__ - // many modern phones/tables have 4-core CPUs. Let's use no more - // than 2 threads by default not to overheat the devices - const unsigned int default_number_of_threads = 2; -#else - const unsigned int default_number_of_threads = (unsigned int)std::max(1, cv::getNumberOfCPUs()); -#endif - - unsigned int result = default_number_of_threads; - - char * env = getenv(m_env_name); - - if(env != NULL) - { - sscanf(env, "%u", &result); - - result = std::max(1u, result); - //do we need upper limit of threads number? - } - - return result; -} - -void parallel_for_pthreads(const cv::Range& range, const cv::ParallelLoopBody& body, double nstripes); -size_t parallel_pthreads_get_threads_num(); -void parallel_pthreads_set_threads_num(int num); - -size_t parallel_pthreads_get_threads_num() -{ - return ThreadManager::instance().getNumOfThreads(); -} - -void parallel_pthreads_set_threads_num(int num) -{ - if(num < 0) - { - ThreadManager::instance().setNumOfThreads(0); - } - else - { - ThreadManager::instance().setNumOfThreads(size_t(num)); - } -} - -void parallel_for_pthreads(const cv::Range& range, const cv::ParallelLoopBody& body, double nstripes) -{ - ThreadManager::instance().run(range, body, nstripes); -} - -} - -#endif