/////////////////////////////////////////////////////////////////////////// // // Copyright (c) 2005-2012, Industrial Light & Magic, a division of Lucas // Digital Ltd. LLC // // All rights reserved. // // Redistribution and use in source and binary forms, with or without // modification, are permitted provided that the following conditions are // met: // * Redistributions of source code must retain the above copyright // notice, this list of conditions and the following disclaimer. // * Redistributions in binary form must reproduce the above // copyright notice, this list of conditions and the following disclaimer // in the documentation and/or other materials provided with the // distribution. // * Neither the name of Industrial Light & Magic nor the names of // its contributors may be used to endorse or promote products derived // from this software without specific prior written permission. // // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS // "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT // LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR // A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT // OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, // SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT // LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, // DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY // THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT // (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE // OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. // /////////////////////////////////////////////////////////////////////////// //----------------------------------------------------------------------------- // // class Task, class ThreadPool, class TaskGroup // //----------------------------------------------------------------------------- #include "IlmThread.h" #include "IlmThreadMutex.h" #include "IlmThreadSemaphore.h" #include "IlmThreadPool.h" #include "Iex.h" #include #ifndef ILMBASE_FORCE_CXX03 # include # include # include #endif using namespace std; ILMTHREAD_INTERNAL_NAMESPACE_SOURCE_ENTER #if defined(__GNU_LIBRARY__) && ( __GLIBC__ < 2 || ( __GLIBC__ == 2 && __GLIBC_MINOR__ < 21 ) ) # define ENABLE_SEM_DTOR_WORKAROUND #endif struct TaskGroup::Data { Data (); ~Data (); void addTask () ; void removeTask (); #ifndef ILMBASE_FORCE_CXX03 std::atomic numPending; #else int numPending; // number of pending tasks to still execute #endif Semaphore isEmpty; // used to signal that the taskgroup is empty #if defined(ENABLE_SEM_DTOR_WORKAROUND) || defined(ILMBASE_FORCE_CXX03) // this mutex is also used to lock numPending in the legacy c++ mode... Mutex dtorMutex; // used to work around the glibc bug: // http://sources.redhat.com/bugzilla/show_bug.cgi?id=12674 #endif }; struct ThreadPool::Data { typedef ThreadPoolProvider *TPPointer; Data (); ~Data(); struct SafeProvider { SafeProvider (Data *d, ThreadPoolProvider *p) : _data( d ), _ptr( p ) { } ~SafeProvider() { if ( _data ) _data->coalesceProviderUse(); } SafeProvider (const SafeProvider &o) : _data( o._data ), _ptr( o._ptr ) { if ( _data ) _data->bumpProviderUse(); } SafeProvider &operator= (const SafeProvider &o) { if ( this != &o ) { if ( o._data ) o._data->bumpProviderUse(); if ( _data ) _data->coalesceProviderUse(); _data = o._data; _ptr = o._ptr; } return *this; } #ifndef ILMBASE_FORCE_CXX03 SafeProvider( SafeProvider &&o ) : _data( o._data ), _ptr( o._ptr ) { o._data = nullptr; } SafeProvider &operator=( SafeProvider &&o ) { std::swap( _data, o._data ); std::swap( _ptr, o._ptr ); return *this; } #endif inline ThreadPoolProvider *get () const { return _ptr; } ThreadPoolProvider *operator-> () const { return get(); } Data *_data; ThreadPoolProvider *_ptr; }; // NB: In C++20, there is full support for atomic shared_ptr, but that is not // yet in use or finalized. Once stabilized, add appropriate usage here inline SafeProvider getProvider (); inline void coalesceProviderUse (); inline void bumpProviderUse (); inline void setProvider (ThreadPoolProvider *p); #ifdef ILMBASE_FORCE_CXX03 Semaphore provSem; Mutex provMutex; int provUsers; ThreadPoolProvider *provider; ThreadPoolProvider *oldprovider; #else std::atomic provider; std::atomic provUsers; #endif }; namespace { class DefaultWorkerThread; struct DefaultWorkData { Semaphore taskSemaphore; // threads wait on this for ready tasks mutable Mutex taskMutex; // mutual exclusion for the tasks list vector tasks; // the list of tasks to execute Semaphore threadSemaphore; // signaled when a thread starts executing mutable Mutex threadMutex; // mutual exclusion for threads list vector threads; // the list of all threads #ifdef ILMBASE_FORCE_CXX03 bool stopping; // flag indicating whether to stop threads mutable Mutex stopMutex; // mutual exclusion for stopping flag #else std::atomic hasThreads; std::atomic stopping; #endif inline bool stopped () const { #ifdef ILMBASE_FORCE_CXX03 Lock lock (stopMutex); return stopping; #else return stopping.load( std::memory_order_relaxed ); #endif } inline void stop () { #ifdef ILMBASE_FORCE_CXX03 Lock lock (stopMutex); #endif stopping = true; } }; // // class WorkerThread // class DefaultWorkerThread: public Thread { public: DefaultWorkerThread (DefaultWorkData* data); virtual void run (); private: DefaultWorkData * _data; }; DefaultWorkerThread::DefaultWorkerThread (DefaultWorkData* data): _data (data) { start(); } void DefaultWorkerThread::run () { // // Signal that the thread has started executing // _data->threadSemaphore.post(); while (true) { // // Wait for a task to become available // _data->taskSemaphore.wait(); { Lock taskLock (_data->taskMutex); // // If there is a task pending, pop off the next task in the FIFO // if (!_data->tasks.empty()) { Task* task = _data->tasks.back(); _data->tasks.pop_back(); taskLock.release(); TaskGroup* taskGroup = task->group(); task->execute(); delete task; taskGroup->_data->removeTask (); } else if (_data->stopped()) { break; } } } } // // class DefaultThreadPoolProvider // class DefaultThreadPoolProvider : public ThreadPoolProvider { public: DefaultThreadPoolProvider(int count); virtual ~DefaultThreadPoolProvider(); virtual int numThreads() const; virtual void setNumThreads(int count); virtual void addTask(Task *task); virtual void finish(); private: DefaultWorkData _data; }; DefaultThreadPoolProvider::DefaultThreadPoolProvider (int count) { setNumThreads(count); } DefaultThreadPoolProvider::~DefaultThreadPoolProvider () { finish(); } int DefaultThreadPoolProvider::numThreads () const { Lock lock (_data.threadMutex); return static_cast (_data.threads.size()); } void DefaultThreadPoolProvider::setNumThreads (int count) { // // Lock access to thread list and size // Lock lock (_data.threadMutex); size_t desired = static_cast(count); if (desired > _data.threads.size()) { // // Add more threads // while (_data.threads.size() < desired) _data.threads.push_back (new DefaultWorkerThread (&_data)); } else if ((size_t)count < _data.threads.size()) { // // Wait until all existing threads are finished processing, // then delete all threads. // finish (); // // Add in new threads // while (_data.threads.size() < desired) _data.threads.push_back (new DefaultWorkerThread (&_data)); } #ifndef ILMBASE_FORCE_CXX03 _data.hasThreads = !(_data.threads.empty()); #endif } void DefaultThreadPoolProvider::addTask (Task *task) { // // Lock the threads, needed to access numThreads // #ifdef ILMBASE_FORCE_CXX03 bool doPush; { Lock lock (_data.threadMutex); doPush = !_data.threads.empty(); } #else bool doPush = _data.hasThreads.load( std::memory_order_relaxed ); #endif if ( doPush ) { // // Get exclusive access to the tasks queue // { Lock taskLock (_data.taskMutex); // // Push the new task into the FIFO // _data.tasks.push_back (task); } // // Signal that we have a new task to process // _data.taskSemaphore.post (); } else { // this path shouldn't normally happen since we have the // NullThreadPoolProvider, but just in case... task->execute (); task->group()->_data->removeTask (); delete task; } } void DefaultThreadPoolProvider::finish () { _data.stop(); // // Signal enough times to allow all threads to stop. // // Wait until all threads have started their run functions. // If we do not wait before we destroy the threads then it's // possible that the threads have not yet called their run // functions. // If this happens then the run function will be called off // of an invalid object and we will crash, most likely with // an error like: "pure virtual method called" // size_t curT = _data.threads.size(); for (size_t i = 0; i != curT; ++i) { _data.taskSemaphore.post(); _data.threadSemaphore.wait(); } // // Join all the threads // for (size_t i = 0; i != curT; ++i) delete _data.threads[i]; Lock lock1 (_data.taskMutex); #ifdef ILMBASE_FORCE_CXX03 Lock lock2 (_data.stopMutex); #endif _data.threads.clear(); _data.tasks.clear(); _data.stopping = false; } class NullThreadPoolProvider : public ThreadPoolProvider { virtual ~NullThreadPoolProvider() {} virtual int numThreads () const { return 0; } virtual void setNumThreads (int count) { } virtual void addTask (Task *t) { t->execute (); t->group()->_data->removeTask (); delete t; } virtual void finish () {} }; } //namespace // // struct TaskGroup::Data // TaskGroup::Data::Data (): isEmpty (1), numPending (0) { // empty } TaskGroup::Data::~Data () { // // A TaskGroup acts like an "inverted" semaphore: if the count // is above 0 then waiting on the taskgroup will block. This // destructor waits until the taskgroup is empty before returning. // isEmpty.wait (); #ifdef ENABLE_SEM_DTOR_WORKAROUND // Update: this was fixed in v. 2.2.21, so this ifdef checks for that // // Alas, given the current bug in glibc we need a secondary // syncronisation primitive here to account for the fact that // destructing the isEmpty Semaphore in this thread can cause // an error for a separate thread that is issuing the post() call. // We are entitled to destruct the semaphore at this point, however, // that post() call attempts to access data out of the associated // memory *after* it has woken the waiting threads, including this one, // potentially leading to invalid memory reads. // http://sources.redhat.com/bugzilla/show_bug.cgi?id=12674 Lock lock (dtorMutex); #endif } void TaskGroup::Data::addTask () { // // in c++11, we use an atomic to protect numPending to avoid the // extra lock but for c++98, to add the ability for custom thread // pool we add the lock here // #if ILMBASE_FORCE_CXX03 Lock lock (dtorMutex); #endif if (numPending++ == 0) isEmpty.wait (); } void TaskGroup::Data::removeTask () { // Alas, given the current bug in glibc we need a secondary // syncronisation primitive here to account for the fact that // destructing the isEmpty Semaphore in a separate thread can // cause an error. Issuing the post call here the current libc // implementation attempts to access memory *after* it has woken // waiting threads. // Since other threads are entitled to delete the semaphore the // access to the memory location can be invalid. // http://sources.redhat.com/bugzilla/show_bug.cgi?id=12674 // Update: this bug has been fixed, but how do we know which // glibc version we're in? // Further update: // // we could remove this if it is a new enough glibc, however // we've changed the API to enable a custom override of a // thread pool. In order to provide safe access to the numPending, // we need the lock anyway, except for c++11 or newer #ifdef ILMBASE_FORCE_CXX03 Lock lock (dtorMutex); if (--numPending == 0) isEmpty.post (); #else if (--numPending == 0) { #ifdef ENABLE_SEM_DTOR_WORKAROUND Lock lock (dtorMutex); #endif isEmpty.post (); } #endif } // // struct ThreadPool::Data // ThreadPool::Data::Data (): provUsers (0), provider (NULL) #ifdef ILMBASE_FORCE_CXX03 , oldprovider (NULL) #else #endif { // empty } ThreadPool::Data::~Data() { #ifdef ILMBASE_FORCE_CXX03 provider->finish(); #else ThreadPoolProvider *p = provider.load( std::memory_order_relaxed ); p->finish(); #endif } inline ThreadPool::Data::SafeProvider ThreadPool::Data::getProvider () { #ifdef ILMBASE_FORCE_CXX03 Lock provLock( provMutex ); ++provUsers; return SafeProvider( this, provider ); #else provUsers.fetch_add( 1, std::memory_order_relaxed ); return SafeProvider( this, provider.load( std::memory_order_relaxed ) ); #endif } inline void ThreadPool::Data::coalesceProviderUse () { #ifdef ILMBASE_FORCE_CXX03 Lock provLock( provMutex ); --provUsers; if ( provUsers == 0 ) { if ( oldprovider ) provSem.post(); } #else int ov = provUsers.fetch_sub( 1, std::memory_order_relaxed ); // ov is the previous value, so one means that now it might be 0 if ( ov == 1 ) { } #endif } inline void ThreadPool::Data::bumpProviderUse () { #ifdef ILMBASE_FORCE_CXX03 Lock lock (provMutex); ++provUsers; #else provUsers.fetch_add( 1, std::memory_order_relaxed ); #endif } inline void ThreadPool::Data::setProvider (ThreadPoolProvider *p) { #ifdef ILMBASE_FORCE_CXX03 Lock provLock( provMutex ); if ( oldprovider ) throw IEX_INTERNAL_NAMESPACE::ArgExc ("Attempt to set the thread pool provider while" " another thread is currently setting the provider."); oldprovider = provider; provider = p; while ( provUsers > 0 ) { provLock.release(); provSem.wait(); provLock.acquire(); } if ( oldprovider ) { oldprovider->finish(); delete oldprovider; oldprovider = NULL; } #else ThreadPoolProvider *old = provider.load( std::memory_order_relaxed ); do { if ( ! provider.compare_exchange_weak( old, p, std::memory_order_release, std::memory_order_relaxed ) ) continue; } while ( false ); // wait for any other users to finish prior to deleting, given // that these are just mostly to query the thread count or push a // task to the queue (so fast), just spin... // // (well, and normally, people don't do this mid stream anyway, so // this will be 0 99.999% of the time, but just to be safe) // while ( provUsers.load( std::memory_order_relaxed ) > 0 ) std::this_thread::yield(); if ( old ) { old->finish(); delete old; } // NB: the shared_ptr mechanism is safer and means we don't have // to have the provUsers counter since the shared_ptr keeps that // for us. However, gcc 4.8/9 compilers which many people are // still using even though it is 2018 forgot to add the shared_ptr // functions... once that compiler is fully deprecated, switch to // using the below, change provider to a std::shared_ptr and remove // provUsers... // // std::shared_ptr newp( p ); // std::shared_ptr curp = std::atomic_load_explicit( &provider, std::memory_order_relaxed ); // do // { // if ( ! std::atomic_compare_exchange_weak_explicit( &provider, &curp, newp, std::memory_order_release, std::memory_order_relaxed ) ) // continue; // } while ( false ); // if ( curp ) // curp->finish(); #endif } // // class Task // Task::Task (TaskGroup* g): _group(g) { if ( g ) g->_data->addTask (); } Task::~Task() { // empty } TaskGroup* Task::group () { return _group; } TaskGroup::TaskGroup (): _data (new Data()) { // empty } TaskGroup::~TaskGroup () { delete _data; } void TaskGroup::finishOneTask () { _data->removeTask (); } // // class ThreadPoolProvider // ThreadPoolProvider::ThreadPoolProvider() { } ThreadPoolProvider::~ThreadPoolProvider() { } // // class ThreadPool // ThreadPool::ThreadPool (unsigned nthreads): _data (new Data) { if ( nthreads == 0 ) _data->setProvider( new NullThreadPoolProvider ); else _data->setProvider( new DefaultThreadPoolProvider( int(nthreads) ) ); } ThreadPool::~ThreadPool () { delete _data; } int ThreadPool::numThreads () const { return _data->getProvider ()->numThreads (); } void ThreadPool::setNumThreads (int count) { if (count < 0) throw IEX_INTERNAL_NAMESPACE::ArgExc ("Attempt to set the number of threads " "in a thread pool to a negative value."); bool doReset = false; { Data::SafeProvider sp = _data->getProvider (); int curT = sp->numThreads (); if ( curT == count ) return; if ( curT == 0 ) { NullThreadPoolProvider *npp = dynamic_cast( sp.get() ); if ( npp ) doReset = true; } else if ( count == 0 ) { DefaultThreadPoolProvider *dpp = dynamic_cast( sp.get() ); if ( dpp ) doReset = true; } if ( ! doReset ) sp->setNumThreads( count ); } if ( doReset ) { if ( count == 0 ) _data->setProvider( new NullThreadPoolProvider ); else _data->setProvider( new DefaultThreadPoolProvider( count ) ); } } void ThreadPool::setThreadProvider (ThreadPoolProvider *provider) { _data->setProvider (provider); } void ThreadPool::addTask (Task* task) { _data->getProvider ()->addTask (task); } ThreadPool& ThreadPool::globalThreadPool () { // // The global thread pool // static ThreadPool gThreadPool (0); return gThreadPool; } void ThreadPool::addGlobalTask (Task* task) { globalThreadPool().addTask (task); } ILMTHREAD_INTERNAL_NAMESPACE_SOURCE_EXIT