Files
ogre-base/blender-build/build_environment/patches/openexr_b18905772e.diff
2025-08-17 06:33:24 +03:00

1024 lines
30 KiB
Diff

From b18905772e9dd98658f8a37d16c6e53c7c17adaa Mon Sep 17 00:00:00 2001
From: Kimball Thurston <kdt3rd@gmail.com>
Date: Sat, 22 Apr 2023 15:58:43 +1200
Subject: [PATCH] Change setNumThreads to wait for thread start (#1291)
Fix Issue #890, issue with windows shutdown when exiting quickly prior to
threads actually starting. We do this by having a data block that is passed
by shared_ptr to the thread to avoid dereferencing a deleted object.
Further, greatly simplify the ThreadPool code by using atomic shared_ptr
functions instead of trying to manually implement something similar and
otherwise modernizing the thread code.
Fix a few potential null dereference locations and also fix an issue when
systems are overloaded enabling the TaskGroup destructor to destroy
the semaphore while code is still using it, causing undefined memory
corruption if some other thread immediately allocates that same block
Originally based on a proposed patch by Dieter De Baets @debaetsd
Signed-off-by: Kimball Thurston <kdt3rd@gmail.com>
---
src/lib/IlmThread/IlmThreadPool.cpp | 740 ++++++++++---------------
src/lib/IlmThread/IlmThreadSemaphore.h | 8 +-
2 files changed, 303 insertions(+), 445 deletions(-)
diff --git a/src/lib/IlmThread/IlmThreadPool.cpp b/src/lib/IlmThread/IlmThreadPool.cpp
index 0ddcf8d52..f4578a510 100644
--- a/src/lib/IlmThread/IlmThreadPool.cpp
+++ b/src/lib/IlmThread/IlmThreadPool.cpp
@@ -15,12 +15,17 @@
#include "Iex.h"
#include <atomic>
+#include <limits>
#include <memory>
#include <mutex>
#include <vector>
#include <thread>
-using namespace std;
+#if (defined(_WIN32) || defined(_WIN64))
+# include <windows.h>
+#else
+# include <unistd.h>
+#endif
ILMTHREAD_INTERNAL_NAMESPACE_SOURCE_ENTER
@@ -28,380 +33,301 @@ ILMTHREAD_INTERNAL_NAMESPACE_SOURCE_ENTER
# define ENABLE_THREADING
#endif
-#if defined(__GNU_LIBRARY__) && ( __GLIBC__ < 2 || ( __GLIBC__ == 2 && __GLIBC_MINOR__ < 21 ) )
-# define ENABLE_SEM_DTOR_WORKAROUND
-#endif
-
-#ifdef ENABLE_THREADING
-
-struct TaskGroup::Data
+namespace
{
- Data ();
- ~Data ();
-
- void addTask () ;
- void removeTask ();
- std::atomic<int> numPending;
- Semaphore isEmpty; // used to signal that the taskgroup is empty
-#if defined(ENABLE_SEM_DTOR_WORKAROUND)
- // this mutex is also used to lock numPending in the legacy c++ mode...
- std::mutex dtorMutex; // used to work around the glibc bug:
- // http://sources.redhat.com/bugzilla/show_bug.cgi?id=12674
-#endif
-};
-
-struct ThreadPool::Data
+static inline void
+handleProcessTask (Task* task)
{
- typedef ThreadPoolProvider *TPPointer;
-
- Data ();
- ~Data();
- Data (const Data&) = delete;
- Data &operator= (const Data&) = delete;
- Data (Data&&) = delete;
- Data &operator= (Data&&) = delete;
-
- struct SafeProvider
+ if (task)
{
- SafeProvider (Data *d, ThreadPoolProvider *p) : _data( d ), _ptr( p )
- {
- }
-
- ~SafeProvider()
- {
- if ( _data )
- _data->coalesceProviderUse();
- }
- SafeProvider (const SafeProvider &o)
- : _data( o._data ), _ptr( o._ptr )
- {
- if ( _data )
- _data->bumpProviderUse();
- }
- SafeProvider &operator= (const SafeProvider &o)
- {
- if ( this != &o )
- {
- if ( o._data )
- o._data->bumpProviderUse();
- if ( _data )
- _data->coalesceProviderUse();
- _data = o._data;
- _ptr = o._ptr;
- }
- return *this;
- }
- SafeProvider( SafeProvider &&o )
- : _data( o._data ), _ptr( o._ptr )
- {
- o._data = nullptr;
- }
- SafeProvider &operator=( SafeProvider &&o )
- {
- std::swap( _data, o._data );
- std::swap( _ptr, o._ptr );
- return *this;
- }
-
- inline ThreadPoolProvider *get () const
- {
- return _ptr;
- }
- ThreadPoolProvider *operator-> () const
- {
- return get();
- }
-
- Data *_data;
- ThreadPoolProvider *_ptr;
- };
-
- // NB: In C++20, there is full support for atomic shared_ptr, but that is not
- // yet in use or finalized. Once stabilized, add appropriate usage here
- inline SafeProvider getProvider ();
- inline void coalesceProviderUse ();
- inline void bumpProviderUse ();
- inline void setProvider (ThreadPoolProvider *p);
-
- std::atomic<int> provUsers;
- std::atomic<ThreadPoolProvider *> provider;
-};
-
+ TaskGroup* taskGroup = task->group ();
+ task->execute ();
-namespace {
+ // kill the task prior to notifying the group
+ // such that any internal reference-based
+ // semantics will be handled prior to
+ // the task group destructor letting it out
+ // of the scope of those references
+ delete task;
-class DefaultWorkerThread;
+ if (taskGroup) taskGroup->finishOneTask ();
+ }
+}
-struct DefaultWorkData
+#ifdef ENABLE_THREADING
+struct DefaultThreadPoolData
{
- Semaphore taskSemaphore; // threads wait on this for ready tasks
- mutable std::mutex taskMutex; // mutual exclusion for the tasks list
- vector<Task*> tasks; // the list of tasks to execute
+ Semaphore _taskSemaphore; // threads wait on this for ready tasks
+ mutable std::mutex _taskMutex; // mutual exclusion for the tasks list
+ std::vector<Task*> _tasks; // the list of tasks to execute
- Semaphore threadSemaphore; // signaled when a thread starts executing
- mutable std::mutex threadMutex; // mutual exclusion for threads list
- vector<DefaultWorkerThread*> threads; // the list of all threads
-
- std::atomic<bool> hasThreads;
- std::atomic<bool> stopping;
+ mutable std::mutex _threadMutex; // mutual exclusion for threads list
+ std::vector<std::thread> _threads; // the list of all threads
+
+ std::atomic<int> _threadCount;
+ std::atomic<bool> _stopping;
inline bool stopped () const
{
- return stopping.load( std::memory_order_relaxed );
+ return _stopping.load (std::memory_order_relaxed);
}
- inline void stop ()
+ inline void stop () { _stopping = true; }
+
+ inline void resetAtomics ()
{
- stopping = true;
+ _threadCount = 0;
+ _stopping = false;
}
};
+#endif
-//
-// class WorkerThread
-//
-class DefaultWorkerThread: public Thread
+} // namespace
+
+#ifdef ENABLE_THREADING
+
+struct TaskGroup::Data
{
- public:
+ Data ();
+ ~Data ();
+ Data (const Data&) = delete;
+ Data& operator= (const Data&) = delete;
+ Data (Data&&) = delete;
+ Data& operator= (Data&&) = delete;
- DefaultWorkerThread (DefaultWorkData* data);
+ void addTask ();
+ void removeTask ();
- virtual void run ();
-
- private:
+ void waitForEmpty ();
- DefaultWorkData * _data;
+ std::atomic<int> numPending;
+ std::atomic<int> inFlight;
+ Semaphore isEmpty; // used to signal that the taskgroup is empty
};
-DefaultWorkerThread::DefaultWorkerThread (DefaultWorkData* data):
- _data (data)
+struct ThreadPool::Data
{
- start();
-}
-
+ using ProviderPtr = std::shared_ptr<ThreadPoolProvider>;
-void
-DefaultWorkerThread::run ()
-{
- //
- // Signal that the thread has started executing
- //
+ Data ();
+ ~Data ();
+ Data (const Data&) = delete;
+ Data& operator= (const Data&) = delete;
+ Data (Data&&) = delete;
+ Data& operator= (Data&&) = delete;
- _data->threadSemaphore.post();
+ ProviderPtr getProvider () const { return std::atomic_load (&_provider); }
- while (true)
+ void setProvider (ProviderPtr provider)
{
- //
- // Wait for a task to become available
- //
-
- _data->taskSemaphore.wait();
-
- {
- std::unique_lock<std::mutex> taskLock (_data->taskMutex);
-
- //
- // If there is a task pending, pop off the next task in the FIFO
- //
+ ProviderPtr curp = std::atomic_exchange (&_provider, provider);
+ if (curp && curp != provider) curp->finish ();
+ }
- if (!_data->tasks.empty())
- {
- Task* task = _data->tasks.back();
- _data->tasks.pop_back();
- // release the mutex while we process
- taskLock.unlock();
+ std::shared_ptr<ThreadPoolProvider> _provider;
+};
- TaskGroup* taskGroup = task->group();
- task->execute();
- delete task;
-
- taskGroup->_data->removeTask ();
- }
- else if (_data->stopped())
- {
- break;
- }
- }
- }
-}
+namespace {
//
// class DefaultThreadPoolProvider
//
class DefaultThreadPoolProvider : public ThreadPoolProvider
{
- public:
- DefaultThreadPoolProvider(int count);
- virtual ~DefaultThreadPoolProvider();
+public:
+ DefaultThreadPoolProvider (int count);
+ DefaultThreadPoolProvider (const DefaultThreadPoolProvider&) = delete;
+ DefaultThreadPoolProvider&
+ operator= (const DefaultThreadPoolProvider&) = delete;
+ DefaultThreadPoolProvider (DefaultThreadPoolProvider&&) = delete;
+ DefaultThreadPoolProvider& operator= (DefaultThreadPoolProvider&&) = delete;
+ ~DefaultThreadPoolProvider () override;
+
+ int numThreads () const override;
+ void setNumThreads (int count) override;
+ void addTask (Task* task) override;
- virtual int numThreads() const;
- virtual void setNumThreads(int count);
- virtual void addTask(Task *task);
+ void finish () override;
- virtual void finish();
+private:
+ void lockedFinish ();
+ void threadLoop (std::shared_ptr<DefaultThreadPoolData> d);
- private:
- DefaultWorkData _data;
+ std::shared_ptr<DefaultThreadPoolData> _data;
};
DefaultThreadPoolProvider::DefaultThreadPoolProvider (int count)
+ : _data (std::make_shared<DefaultThreadPoolData> ())
{
- setNumThreads(count);
+ _data->resetAtomics ();
+ setNumThreads (count);
}
DefaultThreadPoolProvider::~DefaultThreadPoolProvider ()
-{
- finish();
-}
+{}
int
DefaultThreadPoolProvider::numThreads () const
{
- std::lock_guard<std::mutex> lock (_data.threadMutex);
- return static_cast<int> (_data.threads.size());
+ return _data->_threadCount.load ();
}
void
DefaultThreadPoolProvider::setNumThreads (int count)
{
- //
- // Lock access to thread list and size
- //
+ // since we're a private class, the thread pool won't call us if
+ // we aren't changing size so no need to check that...
- std::lock_guard<std::mutex> lock (_data.threadMutex);
+ std::lock_guard<std::mutex> lock (_data->_threadMutex);
- size_t desired = static_cast<size_t>(count);
- if (desired > _data.threads.size())
- {
- //
- // Add more threads
- //
+ size_t curThreads = _data->_threads.size ();
+ size_t nToAdd = static_cast<size_t> (count);
- while (_data.threads.size() < desired)
- _data.threads.push_back (new DefaultWorkerThread (&_data));
- }
- else if ((size_t)count < _data.threads.size())
+ if (nToAdd < curThreads)
{
- //
- // Wait until all existing threads are finished processing,
- // then delete all threads.
- //
- finish ();
-
- //
- // Add in new threads
- //
-
- while (_data.threads.size() < desired)
- _data.threads.push_back (new DefaultWorkerThread (&_data));
+ // no easy way to only shutdown the n threads at the end of
+ // the vector (well, really, guaranteeing they are the ones to
+ // be woken up), so just kill all of the threads
+ lockedFinish ();
+ curThreads = 0;
}
- _data.hasThreads = !(_data.threads.empty());
+ _data->_threads.resize (nToAdd);
+ for (size_t i = curThreads; i < nToAdd; ++i)
+ {
+ _data->_threads[i] =
+ std::thread (&DefaultThreadPoolProvider::threadLoop, this, _data);
+ }
+ _data->_threadCount = static_cast<int> (_data->_threads.size ());
}
void
DefaultThreadPoolProvider::addTask (Task *task)
{
- //
- // Lock the threads, needed to access numThreads
- //
- bool doPush = _data.hasThreads.load( std::memory_order_relaxed );
-
- if ( doPush )
+ // the thread pool will kill us and switch to a null provider
+ // if the thread count is set to 0, so we can always
+ // go ahead and lock and assume we have a thread to do the
+ // processing
{
- //
- // Get exclusive access to the tasks queue
- //
+ std::lock_guard<std::mutex> taskLock (_data->_taskMutex);
- {
- std::lock_guard<std::mutex> taskLock (_data.taskMutex);
-
- //
- // Push the new task into the FIFO
- //
- _data.tasks.push_back (task);
- }
-
//
- // Signal that we have a new task to process
+ // Push the new task into the FIFO
//
- _data.taskSemaphore.post ();
- }
- else
- {
- // this path shouldn't normally happen since we have the
- // NullThreadPoolProvider, but just in case...
- task->execute ();
- task->group()->_data->removeTask ();
- delete task;
+ _data->_tasks.push_back (task);
}
+
+ //
+ // Signal that we have a new task to process
+ //
+ _data->_taskSemaphore.post ();
}
void
DefaultThreadPoolProvider::finish ()
{
- _data.stop();
+ std::lock_guard<std::mutex> lock (_data->_threadMutex);
+
+ lockedFinish ();
+}
+
+void
+DefaultThreadPoolProvider::lockedFinish ()
+{
+ _data->stop ();
//
// Signal enough times to allow all threads to stop.
//
- // Wait until all threads have started their run functions.
- // If we do not wait before we destroy the threads then it's
- // possible that the threads have not yet called their run
- // functions.
- // If this happens then the run function will be called off
- // of an invalid object and we will crash, most likely with
- // an error like: "pure virtual method called"
+ // NB: we must do this as many times as we have threads.
//
-
- size_t curT = _data.threads.size();
+ // If there is still work in the queue, or this call happens "too
+ // quickly", threads will not be waiting on the semaphore, so we
+ // need to ensure the semaphore is at a count equal to the amount
+ // of work left plus the number of threads to ensure exit of a
+ // thread. There can be threads in a few states:
+ // - still starting up (successive calls to setNumThreads)
+ // - in the middle of processing a task / looping
+ // - waiting in the semaphore
+ size_t curT = _data->_threads.size ();
for (size_t i = 0; i != curT; ++i)
- {
- if (_data.threads[i]->joinable())
- {
- _data.taskSemaphore.post();
- _data.threadSemaphore.wait();
- }
- }
+ _data->_taskSemaphore.post ();
//
- // Join all the threads
+ // We should not need to check joinability, they should all, by
+ // definition, be joinable (assuming normal start)
//
for (size_t i = 0; i != curT; ++i)
{
- if (_data.threads[i]->joinable())
- _data.threads[i]->join();
- delete _data.threads[i];
+ // This isn't quite right in that the thread may have actually
+ // be in an exited / signalled state (needing the
+ // WaitForSingleObject call), and so already have an exit code
+ // (I think, but the docs are vague), but if we don't do the
+ // join, the stl thread seems to then throw an exception. The
+ // join should just return invalid handle and continue, and is
+ // more of a windows bug... except maybe someone needs to work
+ // around it...
+ //# ifdef TEST_FOR_WIN_THREAD_STATUS
+ //
+ // // per OIIO issue #2038, on exit / dll unload, windows may
+ // // kill the thread, double check that it is still active prior
+ // // to joining.
+ // DWORD tstatus;
+ // if (GetExitCodeThread (_threads[i].native_handle (), &tstatus))
+ // {
+ // if (tstatus != STILL_ACTIVE) { continue; }
+ // }
+ //# endif
+
+ _data->_threads[i].join ();
}
- std::lock_guard<std::mutex> lk( _data.taskMutex );
-
- _data.threads.clear();
- _data.tasks.clear();
+ _data->_threads.clear ();
- _data.stopping = false;
+ _data->resetAtomics ();
}
-
-class NullThreadPoolProvider : public ThreadPoolProvider
+void
+DefaultThreadPoolProvider::threadLoop (
+ std::shared_ptr<DefaultThreadPoolData> data)
{
- virtual ~NullThreadPoolProvider() {}
- virtual int numThreads () const { return 0; }
- virtual void setNumThreads (int count)
- {
- }
- virtual void addTask (Task *t)
+ while (true)
{
- t->execute ();
- t->group()->_data->removeTask ();
- delete t;
+ //
+ // Wait for a task to become available
+ //
+
+ data->_taskSemaphore.wait ();
+
+ {
+ std::unique_lock<std::mutex> taskLock (data->_taskMutex);
+
+ //
+ // If there is a task pending, pop off the next task in the FIFO
+ //
+
+ if (!data->_tasks.empty ())
+ {
+ Task* task = data->_tasks.back ();
+ data->_tasks.pop_back ();
+
+ // release the mutex while we process
+ taskLock.unlock ();
+
+ handleProcessTask (task);
+
+ // do not need to reacquire the lock at all since we
+ // will just loop around, pull any other task
+ }
+ else if (data->stopped ()) { break; }
+ }
}
- virtual void finish () {}
-};
+}
} //namespace
@@ -409,81 +335,69 @@ class NullThreadPoolProvider : public ThreadPoolProvider
// struct TaskGroup::Data
//
-TaskGroup::Data::Data () : numPending (0), isEmpty (1)
-{
- // empty
-}
+TaskGroup::Data::Data () : numPending (0), inFlight (0), isEmpty (1)
+{}
TaskGroup::Data::~Data ()
+{}
+
+void
+TaskGroup::Data::waitForEmpty ()
{
//
// A TaskGroup acts like an "inverted" semaphore: if the count
- // is above 0 then waiting on the taskgroup will block. This
+ // is above 0 then waiting on the taskgroup will block. The
// destructor waits until the taskgroup is empty before returning.
//
isEmpty.wait ();
-#ifdef ENABLE_SEM_DTOR_WORKAROUND
- // Update: this was fixed in v. 2.2.21, so this ifdef checks for that
- //
- // Alas, given the current bug in glibc we need a secondary
- // syncronisation primitive here to account for the fact that
- // destructing the isEmpty Semaphore in this thread can cause
- // an error for a separate thread that is issuing the post() call.
- // We are entitled to destruct the semaphore at this point, however,
- // that post() call attempts to access data out of the associated
- // memory *after* it has woken the waiting threads, including this one,
- // potentially leading to invalid memory reads.
- // http://sources.redhat.com/bugzilla/show_bug.cgi?id=12674
-
- std::lock_guard<std::mutex> lock (dtorMutex);
-#endif
+ // pseudo spin to wait for the notifying thread to finish the post
+ // to avoid a premature deletion of the semaphore
+ int count = 0;
+ while (inFlight.load () > 0)
+ {
+ ++count;
+ if (count > 100)
+ {
+ std::this_thread::yield ();
+ count = 0;
+ }
+ }
}
void
TaskGroup::Data::addTask ()
{
- //
- // in c++11, we use an atomic to protect numPending to avoid the
- // extra lock but for c++98, to add the ability for custom thread
- // pool we add the lock here
- //
- if (numPending++ == 0)
- isEmpty.wait ();
+ inFlight.fetch_add (1);
+
+ // if we are the first task off the rank, clear the
+ // isEmpty semaphore such that the group will actually pause
+ // until the task finishes
+ if (numPending.fetch_add (1) == 0) { isEmpty.wait (); }
}
void
TaskGroup::Data::removeTask ()
{
- // Alas, given the current bug in glibc we need a secondary
- // syncronisation primitive here to account for the fact that
- // destructing the isEmpty Semaphore in a separate thread can
- // cause an error. Issuing the post call here the current libc
- // implementation attempts to access memory *after* it has woken
- // waiting threads.
- // Since other threads are entitled to delete the semaphore the
- // access to the memory location can be invalid.
- // http://sources.redhat.com/bugzilla/show_bug.cgi?id=12674
- // Update: this bug has been fixed, but how do we know which
- // glibc version we're in?
-
- // Further update:
+ // if we are the last task, notify the group we're done
+ if (numPending.fetch_sub (1) == 1) { isEmpty.post (); }
+
+ // in theory, a background thread could actually finish a task
+ // prior to the next task being added. The fetch_add / fetch_sub
+ // logic between addTask and removeTask are fine to keep the
+ // inverted semaphore straight. All addTask must happen prior to
+ // the TaskGroup destructor.
//
- // we could remove this if it is a new enough glibc, however
- // we've changed the API to enable a custom override of a
- // thread pool. In order to provide safe access to the numPending,
- // we need the lock anyway, except for c++11 or newer
- if (--numPending == 0)
- {
-#ifdef ENABLE_SEM_DTOR_WORKAROUND
- std::lock_guard<std::mutex> lk (dtorMutex);
-#endif
- isEmpty.post ();
- }
+ // But to let the taskgroup thread waiting know we're actually
+ // finished with the last one and finished posting (the semaphore
+ // might wake up the other thread while in the middle of post) so
+ // we don't destroy the semaphore while posting to it, keep a
+ // separate counter that is modified pre / post semaphore
+ inFlight.fetch_sub (1);
}
@@ -491,8 +405,7 @@ TaskGroup::Data::removeTask ()
// struct ThreadPool::Data
//
-ThreadPool::Data::Data ():
- provUsers (0), provider (NULL)
+ThreadPool::Data::Data ()
{
// empty
}
@@ -500,82 +413,7 @@ ThreadPool::Data::Data ():
ThreadPool::Data::~Data()
{
- ThreadPoolProvider *p = provider.load( std::memory_order_relaxed );
- p->finish();
- delete p;
-}
-
-inline ThreadPool::Data::SafeProvider
-ThreadPool::Data::getProvider ()
-{
- provUsers.fetch_add( 1, std::memory_order_relaxed );
- return SafeProvider( this, provider.load( std::memory_order_relaxed ) );
-}
-
-
-inline void
-ThreadPool::Data::coalesceProviderUse ()
-{
- int ov = provUsers.fetch_sub( 1, std::memory_order_relaxed );
- // ov is the previous value, so one means that now it might be 0
- if ( ov == 1 )
- {
- // do we have anything to do here?
- }
-}
-
-
-inline void
-ThreadPool::Data::bumpProviderUse ()
-{
- provUsers.fetch_add( 1, std::memory_order_relaxed );
-}
-
-
-inline void
-ThreadPool::Data::setProvider (ThreadPoolProvider *p)
-{
- ThreadPoolProvider *old = provider.load( std::memory_order_relaxed );
- // work around older gcc bug just in case
- do
- {
- if ( ! provider.compare_exchange_weak( old, p, std::memory_order_release, std::memory_order_relaxed ) )
- continue;
- } while (false); // NOSONAR - suppress SonarCloud bug report.
-
- // wait for any other users to finish prior to deleting, given
- // that these are just mostly to query the thread count or push a
- // task to the queue (so fast), just spin...
- //
- // (well, and normally, people don't do this mid stream anyway, so
- // this will be 0 99.999% of the time, but just to be safe)
- //
- while ( provUsers.load( std::memory_order_relaxed ) > 0 )
- std::this_thread::yield();
-
- if ( old )
- {
- old->finish();
- delete old;
- }
-
- // NB: the shared_ptr mechanism is safer and means we don't have
- // to have the provUsers counter since the shared_ptr keeps that
- // for us. However, gcc 4.8/9 compilers which many people are
- // still using even though it is 2018 forgot to add the shared_ptr
- // functions... once that compiler is fully deprecated, switch to
- // using the below, change provider to a std::shared_ptr and remove
- // provUsers...
- //
-// std::shared_ptr<ThreadPoolProvider> newp( p );
-// std::shared_ptr<ThreadPoolProvider> curp = std::atomic_load_explicit( &provider, std::memory_order_relaxed );
-// do
-// {
-// if ( ! std::atomic_compare_exchange_weak_explicit( &provider, &curp, newp, std::memory_order_release, std::memory_order_relaxed ) )
-// continue;
-// } while ( false );
-// if ( curp )
-// curp->finish();
+ setProvider (nullptr);
}
#endif // ENABLE_THREADING
@@ -608,7 +446,7 @@ Task::group ()
TaskGroup::TaskGroup ():
#ifdef ENABLE_THREADING
- _data (new Data())
+ _data (new Data)
#else
_data (nullptr)
#endif
@@ -620,6 +458,7 @@ TaskGroup::TaskGroup ():
TaskGroup::~TaskGroup ()
{
#ifdef ENABLE_THREADING
+ _data->waitForEmpty ();
delete _data;
#endif
}
@@ -660,10 +499,7 @@ ThreadPool::ThreadPool (unsigned nthreads):
#endif
{
#ifdef ENABLE_THREADING
- if ( nthreads == 0 )
- _data->setProvider( new NullThreadPoolProvider );
- else
- _data->setProvider( new DefaultThreadPoolProvider( int(nthreads) ) );
+ setNumThreads (static_cast<int> (nthreads));
#endif
}
@@ -671,6 +507,8 @@ ThreadPool::ThreadPool (unsigned nthreads):
ThreadPool::~ThreadPool ()
{
#ifdef ENABLE_THREADING
+ // ensures any jobs / threads are finished & shutdown
+ _data->setProvider (nullptr);
delete _data;
#endif
}
@@ -680,7 +518,8 @@ int
ThreadPool::numThreads () const
{
#ifdef ENABLE_THREADING
- return _data->getProvider ()->numThreads ();
+ Data::ProviderPtr sp = _data->getProvider ();
+ return (sp) ? sp->numThreads () : 0;
#else
return 0;
#endif
@@ -695,36 +534,30 @@ ThreadPool::setNumThreads (int count)
throw IEX_INTERNAL_NAMESPACE::ArgExc ("Attempt to set the number of threads "
"in a thread pool to a negative value.");
- bool doReset = false;
{
- Data::SafeProvider sp = _data->getProvider ();
- int curT = sp->numThreads ();
- if ( curT == count )
- return;
-
- if ( curT == 0 )
+ Data::ProviderPtr sp = _data->getProvider ();
+ if (sp)
{
- NullThreadPoolProvider *npp = dynamic_cast<NullThreadPoolProvider *>( sp.get() );
- if ( npp )
- doReset = true;
- }
- else if ( count == 0 )
- {
- DefaultThreadPoolProvider *dpp = dynamic_cast<DefaultThreadPoolProvider *>( sp.get() );
- if ( dpp )
- doReset = true;
+ bool doReset = false;
+ int curT = sp->numThreads ();
+ if (curT == count) return;
+
+ if (count != 0)
+ {
+ sp->setNumThreads (count);
+ return;
+ }
}
- if ( ! doReset )
- sp->setNumThreads( count );
}
- if ( doReset )
- {
- if ( count == 0 )
- _data->setProvider( new NullThreadPoolProvider );
- else
- _data->setProvider( new DefaultThreadPoolProvider( count ) );
- }
+ // either a null provider or a case where we should switch from
+ // a default provider to a null one or vice-versa
+ if (count == 0)
+ _data->setProvider (nullptr);
+ else
+ _data->setProvider (
+ std::make_shared<DefaultThreadPoolProvider> (count));
+
#else
// just blindly ignore
(void)count;
@@ -736,7 +569,8 @@ void
ThreadPool::setThreadProvider (ThreadPoolProvider *provider)
{
#ifdef ENABLE_THREADING
- _data->setProvider (provider);
+ // contract is we take ownership and will free the provider
+ _data->setProvider (Data::ProviderPtr (provider));
#else
throw IEX_INTERNAL_NAMESPACE::ArgExc (
"Attempt to set a thread provider on a system with threads"
@@ -748,12 +582,19 @@ ThreadPool::setThreadProvider (ThreadPoolProvider *provider)
void
ThreadPool::addTask (Task* task)
{
+ if (task)
+ {
#ifdef ENABLE_THREADING
- _data->getProvider ()->addTask (task);
-#else
- task->execute ();
- delete task;
+ Data::ProviderPtr p = _data->getProvider ();
+ if (p)
+ {
+ p->addTask (task);
+ return;
+ }
#endif
+
+ handleProcessTask (task);
+ }
}
@@ -780,7 +621,24 @@ unsigned
ThreadPool::estimateThreadCountForFileIO ()
{
#ifdef ENABLE_THREADING
- return std::thread::hardware_concurrency ();
+ unsigned rv = std::thread::hardware_concurrency ();
+ // hardware concurrency is not required to work
+ if (rv == 0 ||
+ rv > static_cast<unsigned> (std::numeric_limits<int>::max ()))
+ {
+ rv = 1;
+# if (defined(_WIN32) || defined(_WIN64))
+ SYSTEM_INFO si;
+ GetNativeSystemInfo (&si);
+
+ rv = si.dwNumberOfProcessors;
+# else
+ // linux, bsd, and mac are fine with this
+ // other *nix should be too, right?
+ rv = sysconf (_SC_NPROCESSORS_ONLN);
+# endif
+ }
+ return rv;
#else
return 0;
#endif
diff --git a/src/lib/IlmThread/IlmThreadSemaphore.h b/src/lib/IlmThread/IlmThreadSemaphore.h
index f26e48a09..576968aa6 100644
--- a/src/lib/IlmThread/IlmThreadSemaphore.h
+++ b/src/lib/IlmThread/IlmThreadSemaphore.h
@@ -64,10 +64,10 @@ class ILMTHREAD_EXPORT_TYPE Semaphore
mutable HANDLE _semaphore;
#elif ILMTHREAD_THREADING_ENABLED
- //
- // If the platform has threads but no semapohores,
- // then we implement them ourselves using condition variables
- //
+ //
+ // If the platform has threads but no semaphores,
+ // then we implement them ourselves using condition variables
+ //
struct sema_t
{