Open Source Computer Vision Library https://opencv.org/
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

851 lines
20 KiB

///////////////////////////////////////////////////////////////////////////
//
// Copyright (c) 2005-2012, Industrial Light & Magic, a division of Lucas
// Digital Ltd. LLC
//
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above
// copyright notice, this list of conditions and the following disclaimer
// in the documentation and/or other materials provided with the
// distribution.
// * Neither the name of Industrial Light & Magic nor the names of
// its contributors may be used to endorse or promote products derived
// from this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
//
///////////////////////////////////////////////////////////////////////////
//-----------------------------------------------------------------------------
//
// class Task, class ThreadPool, class TaskGroup
//
//-----------------------------------------------------------------------------
#include "IlmThread.h"
#include "IlmThreadMutex.h"
#include "IlmThreadSemaphore.h"
#include "IlmThreadPool.h"
#include "Iex.h"
#include <vector>
#ifndef ILMBASE_FORCE_CXX03
# include <memory>
# include <atomic>
# include <thread>
#endif
using namespace std;
ILMTHREAD_INTERNAL_NAMESPACE_SOURCE_ENTER
#if defined(__GNU_LIBRARY__) && ( __GLIBC__ < 2 || ( __GLIBC__ == 2 && __GLIBC_MINOR__ < 21 ) )
# define ENABLE_SEM_DTOR_WORKAROUND
#endif
struct TaskGroup::Data
{
Data ();
~Data ();
void addTask () ;
void removeTask ();
#ifndef ILMBASE_FORCE_CXX03
std::atomic<int> numPending;
#else
int numPending; // number of pending tasks to still execute
#endif
Semaphore isEmpty; // used to signal that the taskgroup is empty
#if defined(ENABLE_SEM_DTOR_WORKAROUND) || defined(ILMBASE_FORCE_CXX03)
// this mutex is also used to lock numPending in the legacy c++ mode...
Mutex dtorMutex; // used to work around the glibc bug:
// http://sources.redhat.com/bugzilla/show_bug.cgi?id=12674
#endif
};
struct ThreadPool::Data
{
typedef ThreadPoolProvider *TPPointer;
Data ();
~Data();
struct SafeProvider
{
SafeProvider (Data *d, ThreadPoolProvider *p) : _data( d ), _ptr( p )
{
}
~SafeProvider()
{
if ( _data )
_data->coalesceProviderUse();
}
SafeProvider (const SafeProvider &o)
: _data( o._data ), _ptr( o._ptr )
{
if ( _data )
_data->bumpProviderUse();
}
SafeProvider &operator= (const SafeProvider &o)
{
if ( this != &o )
{
if ( o._data )
o._data->bumpProviderUse();
if ( _data )
_data->coalesceProviderUse();
_data = o._data;
_ptr = o._ptr;
}
return *this;
}
#ifndef ILMBASE_FORCE_CXX03
SafeProvider( SafeProvider &&o )
: _data( o._data ), _ptr( o._ptr )
{
o._data = nullptr;
}
SafeProvider &operator=( SafeProvider &&o )
{
std::swap( _data, o._data );
std::swap( _ptr, o._ptr );
return *this;
}
#endif
inline ThreadPoolProvider *get () const
{
return _ptr;
}
ThreadPoolProvider *operator-> () const
{
return get();
}
Data *_data;
ThreadPoolProvider *_ptr;
};
// NB: In C++20, there is full support for atomic shared_ptr, but that is not
// yet in use or finalized. Once stabilized, add appropriate usage here
inline SafeProvider getProvider ();
inline void coalesceProviderUse ();
inline void bumpProviderUse ();
inline void setProvider (ThreadPoolProvider *p);
#ifdef ILMBASE_FORCE_CXX03
Semaphore provSem;
Mutex provMutex;
int provUsers;
ThreadPoolProvider *provider;
ThreadPoolProvider *oldprovider;
#else
std::atomic<ThreadPoolProvider *> provider;
std::atomic<int> provUsers;
#endif
};
namespace {
class DefaultWorkerThread;
struct DefaultWorkData
{
Semaphore taskSemaphore; // threads wait on this for ready tasks
mutable Mutex taskMutex; // mutual exclusion for the tasks list
vector<Task*> tasks; // the list of tasks to execute
Semaphore threadSemaphore; // signaled when a thread starts executing
mutable Mutex threadMutex; // mutual exclusion for threads list
vector<DefaultWorkerThread*> threads; // the list of all threads
#ifdef ILMBASE_FORCE_CXX03
bool stopping; // flag indicating whether to stop threads
mutable Mutex stopMutex; // mutual exclusion for stopping flag
#else
std::atomic<bool> hasThreads;
std::atomic<bool> stopping;
#endif
inline bool stopped () const
{
#ifdef ILMBASE_FORCE_CXX03
Lock lock (stopMutex);
return stopping;
#else
return stopping.load( std::memory_order_relaxed );
#endif
}
inline void stop ()
{
#ifdef ILMBASE_FORCE_CXX03
Lock lock (stopMutex);
#endif
stopping = true;
}
};
//
// class WorkerThread
//
class DefaultWorkerThread: public Thread
{
public:
DefaultWorkerThread (DefaultWorkData* data);
virtual void run ();
private:
DefaultWorkData * _data;
};
DefaultWorkerThread::DefaultWorkerThread (DefaultWorkData* data):
_data (data)
{
start();
}
void
DefaultWorkerThread::run ()
{
//
// Signal that the thread has started executing
//
_data->threadSemaphore.post();
while (true)
{
//
// Wait for a task to become available
//
_data->taskSemaphore.wait();
{
Lock taskLock (_data->taskMutex);
//
// If there is a task pending, pop off the next task in the FIFO
//
if (!_data->tasks.empty())
{
Task* task = _data->tasks.back();
_data->tasks.pop_back();
taskLock.release();
TaskGroup* taskGroup = task->group();
task->execute();
delete task;
taskGroup->_data->removeTask ();
}
else if (_data->stopped())
{
break;
}
}
}
}
//
// class DefaultThreadPoolProvider
//
class DefaultThreadPoolProvider : public ThreadPoolProvider
{
public:
DefaultThreadPoolProvider(int count);
virtual ~DefaultThreadPoolProvider();
virtual int numThreads() const;
virtual void setNumThreads(int count);
virtual void addTask(Task *task);
virtual void finish();
private:
DefaultWorkData _data;
};
DefaultThreadPoolProvider::DefaultThreadPoolProvider (int count)
{
setNumThreads(count);
}
DefaultThreadPoolProvider::~DefaultThreadPoolProvider ()
{
finish();
}
int
DefaultThreadPoolProvider::numThreads () const
{
Lock lock (_data.threadMutex);
return static_cast<int> (_data.threads.size());
}
void
DefaultThreadPoolProvider::setNumThreads (int count)
{
//
// Lock access to thread list and size
//
Lock lock (_data.threadMutex);
size_t desired = static_cast<size_t>(count);
if (desired > _data.threads.size())
{
//
// Add more threads
//
while (_data.threads.size() < desired)
_data.threads.push_back (new DefaultWorkerThread (&_data));
}
else if ((size_t)count < _data.threads.size())
{
//
// Wait until all existing threads are finished processing,
// then delete all threads.
//
finish ();
//
// Add in new threads
//
while (_data.threads.size() < desired)
_data.threads.push_back (new DefaultWorkerThread (&_data));
}
#ifndef ILMBASE_FORCE_CXX03
_data.hasThreads = !(_data.threads.empty());
#endif
}
void
DefaultThreadPoolProvider::addTask (Task *task)
{
//
// Lock the threads, needed to access numThreads
//
#ifdef ILMBASE_FORCE_CXX03
bool doPush;
{
Lock lock (_data.threadMutex);
doPush = !_data.threads.empty();
}
#else
bool doPush = _data.hasThreads.load( std::memory_order_relaxed );
#endif
if ( doPush )
{
//
// Get exclusive access to the tasks queue
//
{
Lock taskLock (_data.taskMutex);
//
// Push the new task into the FIFO
//
_data.tasks.push_back (task);
}
//
// Signal that we have a new task to process
//
_data.taskSemaphore.post ();
}
else
{
// this path shouldn't normally happen since we have the
// NullThreadPoolProvider, but just in case...
task->execute ();
task->group()->_data->removeTask ();
delete task;
}
}
void
DefaultThreadPoolProvider::finish ()
{
_data.stop();
//
// Signal enough times to allow all threads to stop.
//
// Wait until all threads have started their run functions.
// If we do not wait before we destroy the threads then it's
// possible that the threads have not yet called their run
// functions.
// If this happens then the run function will be called off
// of an invalid object and we will crash, most likely with
// an error like: "pure virtual method called"
//
size_t curT = _data.threads.size();
for (size_t i = 0; i != curT; ++i)
{
_data.taskSemaphore.post();
_data.threadSemaphore.wait();
}
//
// Join all the threads
//
for (size_t i = 0; i != curT; ++i)
delete _data.threads[i];
Lock lock1 (_data.taskMutex);
#ifdef ILMBASE_FORCE_CXX03
Lock lock2 (_data.stopMutex);
#endif
_data.threads.clear();
_data.tasks.clear();
_data.stopping = false;
}
class NullThreadPoolProvider : public ThreadPoolProvider
{
virtual ~NullThreadPoolProvider() {}
virtual int numThreads () const { return 0; }
virtual void setNumThreads (int count)
{
}
virtual void addTask (Task *t)
{
t->execute ();
t->group()->_data->removeTask ();
delete t;
}
virtual void finish () {}
};
} //namespace
//
// struct TaskGroup::Data
//
TaskGroup::Data::Data (): isEmpty (1), numPending (0)
{
// empty
}
TaskGroup::Data::~Data ()
{
//
// A TaskGroup acts like an "inverted" semaphore: if the count
// is above 0 then waiting on the taskgroup will block. This
// destructor waits until the taskgroup is empty before returning.
//
isEmpty.wait ();
#ifdef ENABLE_SEM_DTOR_WORKAROUND
// Update: this was fixed in v. 2.2.21, so this ifdef checks for that
//
// Alas, given the current bug in glibc we need a secondary
// syncronisation primitive here to account for the fact that
// destructing the isEmpty Semaphore in this thread can cause
// an error for a separate thread that is issuing the post() call.
// We are entitled to destruct the semaphore at this point, however,
// that post() call attempts to access data out of the associated
// memory *after* it has woken the waiting threads, including this one,
// potentially leading to invalid memory reads.
// http://sources.redhat.com/bugzilla/show_bug.cgi?id=12674
Lock lock (dtorMutex);
#endif
}
void
TaskGroup::Data::addTask ()
{
//
// in c++11, we use an atomic to protect numPending to avoid the
// extra lock but for c++98, to add the ability for custom thread
// pool we add the lock here
//
#if ILMBASE_FORCE_CXX03
Lock lock (dtorMutex);
#endif
if (numPending++ == 0)
isEmpty.wait ();
}
void
TaskGroup::Data::removeTask ()
{
// Alas, given the current bug in glibc we need a secondary
// syncronisation primitive here to account for the fact that
// destructing the isEmpty Semaphore in a separate thread can
// cause an error. Issuing the post call here the current libc
// implementation attempts to access memory *after* it has woken
// waiting threads.
// Since other threads are entitled to delete the semaphore the
// access to the memory location can be invalid.
// http://sources.redhat.com/bugzilla/show_bug.cgi?id=12674
// Update: this bug has been fixed, but how do we know which
// glibc version we're in?
// Further update:
//
// we could remove this if it is a new enough glibc, however
// we've changed the API to enable a custom override of a
// thread pool. In order to provide safe access to the numPending,
// we need the lock anyway, except for c++11 or newer
#ifdef ILMBASE_FORCE_CXX03
Lock lock (dtorMutex);
if (--numPending == 0)
isEmpty.post ();
#else
if (--numPending == 0)
{
#ifdef ENABLE_SEM_DTOR_WORKAROUND
Lock lock (dtorMutex);
#endif
isEmpty.post ();
}
#endif
}
//
// struct ThreadPool::Data
//
ThreadPool::Data::Data ():
provUsers (0), provider (NULL)
#ifdef ILMBASE_FORCE_CXX03
, oldprovider (NULL)
#else
#endif
{
// empty
}
ThreadPool::Data::~Data()
{
#ifdef ILMBASE_FORCE_CXX03
provider->finish();
#else
ThreadPoolProvider *p = provider.load( std::memory_order_relaxed );
p->finish();
#endif
}
inline ThreadPool::Data::SafeProvider
ThreadPool::Data::getProvider ()
{
#ifdef ILMBASE_FORCE_CXX03
Lock provLock( provMutex );
++provUsers;
return SafeProvider( this, provider );
#else
provUsers.fetch_add( 1, std::memory_order_relaxed );
return SafeProvider( this, provider.load( std::memory_order_relaxed ) );
#endif
}
inline void
ThreadPool::Data::coalesceProviderUse ()
{
#ifdef ILMBASE_FORCE_CXX03
Lock provLock( provMutex );
--provUsers;
if ( provUsers == 0 )
{
if ( oldprovider )
provSem.post();
}
#else
int ov = provUsers.fetch_sub( 1, std::memory_order_relaxed );
// ov is the previous value, so one means that now it might be 0
if ( ov == 1 )
{
}
#endif
}
inline void
ThreadPool::Data::bumpProviderUse ()
{
#ifdef ILMBASE_FORCE_CXX03
Lock lock (provMutex);
++provUsers;
#else
provUsers.fetch_add( 1, std::memory_order_relaxed );
#endif
}
inline void
ThreadPool::Data::setProvider (ThreadPoolProvider *p)
{
#ifdef ILMBASE_FORCE_CXX03
Lock provLock( provMutex );
if ( oldprovider )
throw IEX_INTERNAL_NAMESPACE::ArgExc ("Attempt to set the thread pool provider while"
" another thread is currently setting the provider.");
oldprovider = provider;
provider = p;
while ( provUsers > 0 )
{
provLock.release();
provSem.wait();
provLock.acquire();
}
if ( oldprovider )
{
oldprovider->finish();
delete oldprovider;
oldprovider = NULL;
}
#else
ThreadPoolProvider *old = provider.load( std::memory_order_relaxed );
do
{
if ( ! provider.compare_exchange_weak( old, p, std::memory_order_release, std::memory_order_relaxed ) )
continue;
} while ( false );
// wait for any other users to finish prior to deleting, given
// that these are just mostly to query the thread count or push a
// task to the queue (so fast), just spin...
//
// (well, and normally, people don't do this mid stream anyway, so
// this will be 0 99.999% of the time, but just to be safe)
//
while ( provUsers.load( std::memory_order_relaxed ) > 0 )
std::this_thread::yield();
if ( old )
{
old->finish();
delete old;
}
// NB: the shared_ptr mechanism is safer and means we don't have
// to have the provUsers counter since the shared_ptr keeps that
// for us. However, gcc 4.8/9 compilers which many people are
// still using even though it is 2018 forgot to add the shared_ptr
// functions... once that compiler is fully deprecated, switch to
// using the below, change provider to a std::shared_ptr and remove
// provUsers...
//
// std::shared_ptr<ThreadPoolProvider> newp( p );
// std::shared_ptr<ThreadPoolProvider> curp = std::atomic_load_explicit( &provider, std::memory_order_relaxed );
// do
// {
// if ( ! std::atomic_compare_exchange_weak_explicit( &provider, &curp, newp, std::memory_order_release, std::memory_order_relaxed ) )
// continue;
// } while ( false );
// if ( curp )
// curp->finish();
#endif
}
//
// class Task
//
Task::Task (TaskGroup* g): _group(g)
{
if ( g )
g->_data->addTask ();
}
Task::~Task()
{
// empty
}
TaskGroup*
Task::group ()
{
return _group;
}
TaskGroup::TaskGroup ():
_data (new Data())
{
// empty
}
TaskGroup::~TaskGroup ()
{
delete _data;
}
void
TaskGroup::finishOneTask ()
{
_data->removeTask ();
}
//
// class ThreadPoolProvider
//
ThreadPoolProvider::ThreadPoolProvider()
{
}
ThreadPoolProvider::~ThreadPoolProvider()
{
}
//
// class ThreadPool
//
ThreadPool::ThreadPool (unsigned nthreads):
_data (new Data)
{
if ( nthreads == 0 )
_data->setProvider( new NullThreadPoolProvider );
else
_data->setProvider( new DefaultThreadPoolProvider( int(nthreads) ) );
}
ThreadPool::~ThreadPool ()
{
delete _data;
}
int
ThreadPool::numThreads () const
{
return _data->getProvider ()->numThreads ();
}
void
ThreadPool::setNumThreads (int count)
{
if (count < 0)
throw IEX_INTERNAL_NAMESPACE::ArgExc ("Attempt to set the number of threads "
"in a thread pool to a negative value.");
bool doReset = false;
{
Data::SafeProvider sp = _data->getProvider ();
int curT = sp->numThreads ();
if ( curT == count )
return;
if ( curT == 0 )
{
NullThreadPoolProvider *npp = dynamic_cast<NullThreadPoolProvider *>( sp.get() );
if ( npp )
doReset = true;
}
else if ( count == 0 )
{
DefaultThreadPoolProvider *dpp = dynamic_cast<DefaultThreadPoolProvider *>( sp.get() );
if ( dpp )
doReset = true;
}
if ( ! doReset )
sp->setNumThreads( count );
}
if ( doReset )
{
if ( count == 0 )
_data->setProvider( new NullThreadPoolProvider );
else
_data->setProvider( new DefaultThreadPoolProvider( count ) );
}
}
void
ThreadPool::setThreadProvider (ThreadPoolProvider *provider)
{
_data->setProvider (provider);
}
void
ThreadPool::addTask (Task* task)
{
_data->getProvider ()->addTask (task);
}
ThreadPool&
ThreadPool::globalThreadPool ()
{
//
// The global thread pool
//
static ThreadPool gThreadPool (0);
return gThreadPool;
}
void
ThreadPool::addGlobalTask (Task* task)
{
globalThreadPool().addTask (task);
}
ILMTHREAD_INTERNAL_NAMESPACE_SOURCE_EXIT