1024 lines
30 KiB
Diff
1024 lines
30 KiB
Diff
From b18905772e9dd98658f8a37d16c6e53c7c17adaa Mon Sep 17 00:00:00 2001
|
|
From: Kimball Thurston <kdt3rd@gmail.com>
|
|
Date: Sat, 22 Apr 2023 15:58:43 +1200
|
|
Subject: [PATCH] Change setNumThreads to wait for thread start (#1291)
|
|
|
|
Fix Issue #890, issue with windows shutdown when exiting quickly prior to
|
|
threads actually starting. We do this by having a data block that is passed
|
|
by shared_ptr to the thread to avoid dereferencing a deleted object.
|
|
|
|
Further, greatly simplify the ThreadPool code by using atomic shared_ptr
|
|
functions instead of trying to manually implement something similar and
|
|
otherwise modernizing the thread code.
|
|
|
|
Fix a few potential null dereference locations and also fix an issue when
|
|
systems are overloaded enabling the TaskGroup destructor to destroy
|
|
the semaphore while code is still using it, causing undefined memory
|
|
corruption if some other thread immediately allocates that same block
|
|
|
|
Originally based on a proposed patch by Dieter De Baets @debaetsd
|
|
|
|
Signed-off-by: Kimball Thurston <kdt3rd@gmail.com>
|
|
---
|
|
src/lib/IlmThread/IlmThreadPool.cpp | 740 ++++++++++---------------
|
|
src/lib/IlmThread/IlmThreadSemaphore.h | 8 +-
|
|
2 files changed, 303 insertions(+), 445 deletions(-)
|
|
|
|
diff --git a/src/lib/IlmThread/IlmThreadPool.cpp b/src/lib/IlmThread/IlmThreadPool.cpp
|
|
index 0ddcf8d52..f4578a510 100644
|
|
--- a/src/lib/IlmThread/IlmThreadPool.cpp
|
|
+++ b/src/lib/IlmThread/IlmThreadPool.cpp
|
|
@@ -15,12 +15,17 @@
|
|
#include "Iex.h"
|
|
|
|
#include <atomic>
|
|
+#include <limits>
|
|
#include <memory>
|
|
#include <mutex>
|
|
#include <vector>
|
|
#include <thread>
|
|
|
|
-using namespace std;
|
|
+#if (defined(_WIN32) || defined(_WIN64))
|
|
+# include <windows.h>
|
|
+#else
|
|
+# include <unistd.h>
|
|
+#endif
|
|
|
|
ILMTHREAD_INTERNAL_NAMESPACE_SOURCE_ENTER
|
|
|
|
@@ -28,380 +33,301 @@ ILMTHREAD_INTERNAL_NAMESPACE_SOURCE_ENTER
|
|
# define ENABLE_THREADING
|
|
#endif
|
|
|
|
-#if defined(__GNU_LIBRARY__) && ( __GLIBC__ < 2 || ( __GLIBC__ == 2 && __GLIBC_MINOR__ < 21 ) )
|
|
-# define ENABLE_SEM_DTOR_WORKAROUND
|
|
-#endif
|
|
-
|
|
-#ifdef ENABLE_THREADING
|
|
-
|
|
-struct TaskGroup::Data
|
|
+namespace
|
|
{
|
|
- Data ();
|
|
- ~Data ();
|
|
-
|
|
- void addTask () ;
|
|
- void removeTask ();
|
|
- std::atomic<int> numPending;
|
|
- Semaphore isEmpty; // used to signal that the taskgroup is empty
|
|
-#if defined(ENABLE_SEM_DTOR_WORKAROUND)
|
|
- // this mutex is also used to lock numPending in the legacy c++ mode...
|
|
- std::mutex dtorMutex; // used to work around the glibc bug:
|
|
- // http://sources.redhat.com/bugzilla/show_bug.cgi?id=12674
|
|
-#endif
|
|
-};
|
|
|
|
-
|
|
-struct ThreadPool::Data
|
|
+static inline void
|
|
+handleProcessTask (Task* task)
|
|
{
|
|
- typedef ThreadPoolProvider *TPPointer;
|
|
-
|
|
- Data ();
|
|
- ~Data();
|
|
- Data (const Data&) = delete;
|
|
- Data &operator= (const Data&) = delete;
|
|
- Data (Data&&) = delete;
|
|
- Data &operator= (Data&&) = delete;
|
|
-
|
|
- struct SafeProvider
|
|
+ if (task)
|
|
{
|
|
- SafeProvider (Data *d, ThreadPoolProvider *p) : _data( d ), _ptr( p )
|
|
- {
|
|
- }
|
|
-
|
|
- ~SafeProvider()
|
|
- {
|
|
- if ( _data )
|
|
- _data->coalesceProviderUse();
|
|
- }
|
|
- SafeProvider (const SafeProvider &o)
|
|
- : _data( o._data ), _ptr( o._ptr )
|
|
- {
|
|
- if ( _data )
|
|
- _data->bumpProviderUse();
|
|
- }
|
|
- SafeProvider &operator= (const SafeProvider &o)
|
|
- {
|
|
- if ( this != &o )
|
|
- {
|
|
- if ( o._data )
|
|
- o._data->bumpProviderUse();
|
|
- if ( _data )
|
|
- _data->coalesceProviderUse();
|
|
- _data = o._data;
|
|
- _ptr = o._ptr;
|
|
- }
|
|
- return *this;
|
|
- }
|
|
- SafeProvider( SafeProvider &&o )
|
|
- : _data( o._data ), _ptr( o._ptr )
|
|
- {
|
|
- o._data = nullptr;
|
|
- }
|
|
- SafeProvider &operator=( SafeProvider &&o )
|
|
- {
|
|
- std::swap( _data, o._data );
|
|
- std::swap( _ptr, o._ptr );
|
|
- return *this;
|
|
- }
|
|
-
|
|
- inline ThreadPoolProvider *get () const
|
|
- {
|
|
- return _ptr;
|
|
- }
|
|
- ThreadPoolProvider *operator-> () const
|
|
- {
|
|
- return get();
|
|
- }
|
|
-
|
|
- Data *_data;
|
|
- ThreadPoolProvider *_ptr;
|
|
- };
|
|
-
|
|
- // NB: In C++20, there is full support for atomic shared_ptr, but that is not
|
|
- // yet in use or finalized. Once stabilized, add appropriate usage here
|
|
- inline SafeProvider getProvider ();
|
|
- inline void coalesceProviderUse ();
|
|
- inline void bumpProviderUse ();
|
|
- inline void setProvider (ThreadPoolProvider *p);
|
|
-
|
|
- std::atomic<int> provUsers;
|
|
- std::atomic<ThreadPoolProvider *> provider;
|
|
-};
|
|
-
|
|
+ TaskGroup* taskGroup = task->group ();
|
|
|
|
+ task->execute ();
|
|
|
|
-namespace {
|
|
+ // kill the task prior to notifying the group
|
|
+ // such that any internal reference-based
|
|
+ // semantics will be handled prior to
|
|
+ // the task group destructor letting it out
|
|
+ // of the scope of those references
|
|
+ delete task;
|
|
|
|
-class DefaultWorkerThread;
|
|
+ if (taskGroup) taskGroup->finishOneTask ();
|
|
+ }
|
|
+}
|
|
|
|
-struct DefaultWorkData
|
|
+#ifdef ENABLE_THREADING
|
|
+struct DefaultThreadPoolData
|
|
{
|
|
- Semaphore taskSemaphore; // threads wait on this for ready tasks
|
|
- mutable std::mutex taskMutex; // mutual exclusion for the tasks list
|
|
- vector<Task*> tasks; // the list of tasks to execute
|
|
+ Semaphore _taskSemaphore; // threads wait on this for ready tasks
|
|
+ mutable std::mutex _taskMutex; // mutual exclusion for the tasks list
|
|
+ std::vector<Task*> _tasks; // the list of tasks to execute
|
|
|
|
- Semaphore threadSemaphore; // signaled when a thread starts executing
|
|
- mutable std::mutex threadMutex; // mutual exclusion for threads list
|
|
- vector<DefaultWorkerThread*> threads; // the list of all threads
|
|
-
|
|
- std::atomic<bool> hasThreads;
|
|
- std::atomic<bool> stopping;
|
|
+ mutable std::mutex _threadMutex; // mutual exclusion for threads list
|
|
+ std::vector<std::thread> _threads; // the list of all threads
|
|
+
|
|
+ std::atomic<int> _threadCount;
|
|
+ std::atomic<bool> _stopping;
|
|
|
|
inline bool stopped () const
|
|
{
|
|
- return stopping.load( std::memory_order_relaxed );
|
|
+ return _stopping.load (std::memory_order_relaxed);
|
|
}
|
|
|
|
- inline void stop ()
|
|
+ inline void stop () { _stopping = true; }
|
|
+
|
|
+ inline void resetAtomics ()
|
|
{
|
|
- stopping = true;
|
|
+ _threadCount = 0;
|
|
+ _stopping = false;
|
|
}
|
|
};
|
|
+#endif
|
|
|
|
-//
|
|
-// class WorkerThread
|
|
-//
|
|
-class DefaultWorkerThread: public Thread
|
|
+} // namespace
|
|
+
|
|
+#ifdef ENABLE_THREADING
|
|
+
|
|
+struct TaskGroup::Data
|
|
{
|
|
- public:
|
|
+ Data ();
|
|
+ ~Data ();
|
|
+ Data (const Data&) = delete;
|
|
+ Data& operator= (const Data&) = delete;
|
|
+ Data (Data&&) = delete;
|
|
+ Data& operator= (Data&&) = delete;
|
|
|
|
- DefaultWorkerThread (DefaultWorkData* data);
|
|
+ void addTask ();
|
|
+ void removeTask ();
|
|
|
|
- virtual void run ();
|
|
-
|
|
- private:
|
|
+ void waitForEmpty ();
|
|
|
|
- DefaultWorkData * _data;
|
|
+ std::atomic<int> numPending;
|
|
+ std::atomic<int> inFlight;
|
|
+ Semaphore isEmpty; // used to signal that the taskgroup is empty
|
|
};
|
|
|
|
|
|
-DefaultWorkerThread::DefaultWorkerThread (DefaultWorkData* data):
|
|
- _data (data)
|
|
+struct ThreadPool::Data
|
|
{
|
|
- start();
|
|
-}
|
|
-
|
|
+ using ProviderPtr = std::shared_ptr<ThreadPoolProvider>;
|
|
|
|
-void
|
|
-DefaultWorkerThread::run ()
|
|
-{
|
|
- //
|
|
- // Signal that the thread has started executing
|
|
- //
|
|
+ Data ();
|
|
+ ~Data ();
|
|
+ Data (const Data&) = delete;
|
|
+ Data& operator= (const Data&) = delete;
|
|
+ Data (Data&&) = delete;
|
|
+ Data& operator= (Data&&) = delete;
|
|
|
|
- _data->threadSemaphore.post();
|
|
+ ProviderPtr getProvider () const { return std::atomic_load (&_provider); }
|
|
|
|
- while (true)
|
|
+ void setProvider (ProviderPtr provider)
|
|
{
|
|
- //
|
|
- // Wait for a task to become available
|
|
- //
|
|
-
|
|
- _data->taskSemaphore.wait();
|
|
-
|
|
- {
|
|
- std::unique_lock<std::mutex> taskLock (_data->taskMutex);
|
|
-
|
|
- //
|
|
- // If there is a task pending, pop off the next task in the FIFO
|
|
- //
|
|
+ ProviderPtr curp = std::atomic_exchange (&_provider, provider);
|
|
+ if (curp && curp != provider) curp->finish ();
|
|
+ }
|
|
|
|
- if (!_data->tasks.empty())
|
|
- {
|
|
- Task* task = _data->tasks.back();
|
|
- _data->tasks.pop_back();
|
|
- // release the mutex while we process
|
|
- taskLock.unlock();
|
|
+ std::shared_ptr<ThreadPoolProvider> _provider;
|
|
+};
|
|
|
|
- TaskGroup* taskGroup = task->group();
|
|
- task->execute();
|
|
|
|
- delete task;
|
|
-
|
|
- taskGroup->_data->removeTask ();
|
|
- }
|
|
- else if (_data->stopped())
|
|
- {
|
|
- break;
|
|
- }
|
|
- }
|
|
- }
|
|
-}
|
|
|
|
+namespace {
|
|
|
|
//
|
|
// class DefaultThreadPoolProvider
|
|
//
|
|
class DefaultThreadPoolProvider : public ThreadPoolProvider
|
|
{
|
|
- public:
|
|
- DefaultThreadPoolProvider(int count);
|
|
- virtual ~DefaultThreadPoolProvider();
|
|
+public:
|
|
+ DefaultThreadPoolProvider (int count);
|
|
+ DefaultThreadPoolProvider (const DefaultThreadPoolProvider&) = delete;
|
|
+ DefaultThreadPoolProvider&
|
|
+ operator= (const DefaultThreadPoolProvider&) = delete;
|
|
+ DefaultThreadPoolProvider (DefaultThreadPoolProvider&&) = delete;
|
|
+ DefaultThreadPoolProvider& operator= (DefaultThreadPoolProvider&&) = delete;
|
|
+ ~DefaultThreadPoolProvider () override;
|
|
+
|
|
+ int numThreads () const override;
|
|
+ void setNumThreads (int count) override;
|
|
+ void addTask (Task* task) override;
|
|
|
|
- virtual int numThreads() const;
|
|
- virtual void setNumThreads(int count);
|
|
- virtual void addTask(Task *task);
|
|
+ void finish () override;
|
|
|
|
- virtual void finish();
|
|
+private:
|
|
+ void lockedFinish ();
|
|
+ void threadLoop (std::shared_ptr<DefaultThreadPoolData> d);
|
|
|
|
- private:
|
|
- DefaultWorkData _data;
|
|
+ std::shared_ptr<DefaultThreadPoolData> _data;
|
|
};
|
|
|
|
DefaultThreadPoolProvider::DefaultThreadPoolProvider (int count)
|
|
+ : _data (std::make_shared<DefaultThreadPoolData> ())
|
|
{
|
|
- setNumThreads(count);
|
|
+ _data->resetAtomics ();
|
|
+ setNumThreads (count);
|
|
}
|
|
|
|
DefaultThreadPoolProvider::~DefaultThreadPoolProvider ()
|
|
-{
|
|
- finish();
|
|
-}
|
|
+{}
|
|
|
|
int
|
|
DefaultThreadPoolProvider::numThreads () const
|
|
{
|
|
- std::lock_guard<std::mutex> lock (_data.threadMutex);
|
|
- return static_cast<int> (_data.threads.size());
|
|
+ return _data->_threadCount.load ();
|
|
}
|
|
|
|
void
|
|
DefaultThreadPoolProvider::setNumThreads (int count)
|
|
{
|
|
- //
|
|
- // Lock access to thread list and size
|
|
- //
|
|
+ // since we're a private class, the thread pool won't call us if
|
|
+ // we aren't changing size so no need to check that...
|
|
|
|
- std::lock_guard<std::mutex> lock (_data.threadMutex);
|
|
+ std::lock_guard<std::mutex> lock (_data->_threadMutex);
|
|
|
|
- size_t desired = static_cast<size_t>(count);
|
|
- if (desired > _data.threads.size())
|
|
- {
|
|
- //
|
|
- // Add more threads
|
|
- //
|
|
+ size_t curThreads = _data->_threads.size ();
|
|
+ size_t nToAdd = static_cast<size_t> (count);
|
|
|
|
- while (_data.threads.size() < desired)
|
|
- _data.threads.push_back (new DefaultWorkerThread (&_data));
|
|
- }
|
|
- else if ((size_t)count < _data.threads.size())
|
|
+ if (nToAdd < curThreads)
|
|
{
|
|
- //
|
|
- // Wait until all existing threads are finished processing,
|
|
- // then delete all threads.
|
|
- //
|
|
- finish ();
|
|
-
|
|
- //
|
|
- // Add in new threads
|
|
- //
|
|
-
|
|
- while (_data.threads.size() < desired)
|
|
- _data.threads.push_back (new DefaultWorkerThread (&_data));
|
|
+ // no easy way to only shutdown the n threads at the end of
|
|
+ // the vector (well, really, guaranteeing they are the ones to
|
|
+ // be woken up), so just kill all of the threads
|
|
+ lockedFinish ();
|
|
+ curThreads = 0;
|
|
}
|
|
|
|
- _data.hasThreads = !(_data.threads.empty());
|
|
+ _data->_threads.resize (nToAdd);
|
|
+ for (size_t i = curThreads; i < nToAdd; ++i)
|
|
+ {
|
|
+ _data->_threads[i] =
|
|
+ std::thread (&DefaultThreadPoolProvider::threadLoop, this, _data);
|
|
+ }
|
|
+ _data->_threadCount = static_cast<int> (_data->_threads.size ());
|
|
}
|
|
|
|
void
|
|
DefaultThreadPoolProvider::addTask (Task *task)
|
|
{
|
|
- //
|
|
- // Lock the threads, needed to access numThreads
|
|
- //
|
|
- bool doPush = _data.hasThreads.load( std::memory_order_relaxed );
|
|
-
|
|
- if ( doPush )
|
|
+ // the thread pool will kill us and switch to a null provider
|
|
+ // if the thread count is set to 0, so we can always
|
|
+ // go ahead and lock and assume we have a thread to do the
|
|
+ // processing
|
|
{
|
|
- //
|
|
- // Get exclusive access to the tasks queue
|
|
- //
|
|
+ std::lock_guard<std::mutex> taskLock (_data->_taskMutex);
|
|
|
|
- {
|
|
- std::lock_guard<std::mutex> taskLock (_data.taskMutex);
|
|
-
|
|
- //
|
|
- // Push the new task into the FIFO
|
|
- //
|
|
- _data.tasks.push_back (task);
|
|
- }
|
|
-
|
|
//
|
|
- // Signal that we have a new task to process
|
|
+ // Push the new task into the FIFO
|
|
//
|
|
- _data.taskSemaphore.post ();
|
|
- }
|
|
- else
|
|
- {
|
|
- // this path shouldn't normally happen since we have the
|
|
- // NullThreadPoolProvider, but just in case...
|
|
- task->execute ();
|
|
- task->group()->_data->removeTask ();
|
|
- delete task;
|
|
+ _data->_tasks.push_back (task);
|
|
}
|
|
+
|
|
+ //
|
|
+ // Signal that we have a new task to process
|
|
+ //
|
|
+ _data->_taskSemaphore.post ();
|
|
}
|
|
|
|
void
|
|
DefaultThreadPoolProvider::finish ()
|
|
{
|
|
- _data.stop();
|
|
+ std::lock_guard<std::mutex> lock (_data->_threadMutex);
|
|
+
|
|
+ lockedFinish ();
|
|
+}
|
|
+
|
|
+void
|
|
+DefaultThreadPoolProvider::lockedFinish ()
|
|
+{
|
|
+ _data->stop ();
|
|
|
|
//
|
|
// Signal enough times to allow all threads to stop.
|
|
//
|
|
- // Wait until all threads have started their run functions.
|
|
- // If we do not wait before we destroy the threads then it's
|
|
- // possible that the threads have not yet called their run
|
|
- // functions.
|
|
- // If this happens then the run function will be called off
|
|
- // of an invalid object and we will crash, most likely with
|
|
- // an error like: "pure virtual method called"
|
|
+ // NB: we must do this as many times as we have threads.
|
|
//
|
|
-
|
|
- size_t curT = _data.threads.size();
|
|
+ // If there is still work in the queue, or this call happens "too
|
|
+ // quickly", threads will not be waiting on the semaphore, so we
|
|
+ // need to ensure the semaphore is at a count equal to the amount
|
|
+ // of work left plus the number of threads to ensure exit of a
|
|
+ // thread. There can be threads in a few states:
|
|
+ // - still starting up (successive calls to setNumThreads)
|
|
+ // - in the middle of processing a task / looping
|
|
+ // - waiting in the semaphore
|
|
+ size_t curT = _data->_threads.size ();
|
|
for (size_t i = 0; i != curT; ++i)
|
|
- {
|
|
- if (_data.threads[i]->joinable())
|
|
- {
|
|
- _data.taskSemaphore.post();
|
|
- _data.threadSemaphore.wait();
|
|
- }
|
|
- }
|
|
+ _data->_taskSemaphore.post ();
|
|
|
|
//
|
|
- // Join all the threads
|
|
+ // We should not need to check joinability, they should all, by
|
|
+ // definition, be joinable (assuming normal start)
|
|
//
|
|
for (size_t i = 0; i != curT; ++i)
|
|
{
|
|
- if (_data.threads[i]->joinable())
|
|
- _data.threads[i]->join();
|
|
- delete _data.threads[i];
|
|
+ // This isn't quite right in that the thread may have actually
|
|
+ // be in an exited / signalled state (needing the
|
|
+ // WaitForSingleObject call), and so already have an exit code
|
|
+ // (I think, but the docs are vague), but if we don't do the
|
|
+ // join, the stl thread seems to then throw an exception. The
|
|
+ // join should just return invalid handle and continue, and is
|
|
+ // more of a windows bug... except maybe someone needs to work
|
|
+ // around it...
|
|
+ //# ifdef TEST_FOR_WIN_THREAD_STATUS
|
|
+ //
|
|
+ // // per OIIO issue #2038, on exit / dll unload, windows may
|
|
+ // // kill the thread, double check that it is still active prior
|
|
+ // // to joining.
|
|
+ // DWORD tstatus;
|
|
+ // if (GetExitCodeThread (_threads[i].native_handle (), &tstatus))
|
|
+ // {
|
|
+ // if (tstatus != STILL_ACTIVE) { continue; }
|
|
+ // }
|
|
+ //# endif
|
|
+
|
|
+ _data->_threads[i].join ();
|
|
}
|
|
|
|
- std::lock_guard<std::mutex> lk( _data.taskMutex );
|
|
-
|
|
- _data.threads.clear();
|
|
- _data.tasks.clear();
|
|
+ _data->_threads.clear ();
|
|
|
|
- _data.stopping = false;
|
|
+ _data->resetAtomics ();
|
|
}
|
|
|
|
-
|
|
-class NullThreadPoolProvider : public ThreadPoolProvider
|
|
+void
|
|
+DefaultThreadPoolProvider::threadLoop (
|
|
+ std::shared_ptr<DefaultThreadPoolData> data)
|
|
{
|
|
- virtual ~NullThreadPoolProvider() {}
|
|
- virtual int numThreads () const { return 0; }
|
|
- virtual void setNumThreads (int count)
|
|
- {
|
|
- }
|
|
- virtual void addTask (Task *t)
|
|
+ while (true)
|
|
{
|
|
- t->execute ();
|
|
- t->group()->_data->removeTask ();
|
|
- delete t;
|
|
+ //
|
|
+ // Wait for a task to become available
|
|
+ //
|
|
+
|
|
+ data->_taskSemaphore.wait ();
|
|
+
|
|
+ {
|
|
+ std::unique_lock<std::mutex> taskLock (data->_taskMutex);
|
|
+
|
|
+ //
|
|
+ // If there is a task pending, pop off the next task in the FIFO
|
|
+ //
|
|
+
|
|
+ if (!data->_tasks.empty ())
|
|
+ {
|
|
+ Task* task = data->_tasks.back ();
|
|
+ data->_tasks.pop_back ();
|
|
+
|
|
+ // release the mutex while we process
|
|
+ taskLock.unlock ();
|
|
+
|
|
+ handleProcessTask (task);
|
|
+
|
|
+ // do not need to reacquire the lock at all since we
|
|
+ // will just loop around, pull any other task
|
|
+ }
|
|
+ else if (data->stopped ()) { break; }
|
|
+ }
|
|
}
|
|
- virtual void finish () {}
|
|
-};
|
|
+}
|
|
|
|
} //namespace
|
|
|
|
@@ -409,81 +335,69 @@ class NullThreadPoolProvider : public ThreadPoolProvider
|
|
// struct TaskGroup::Data
|
|
//
|
|
|
|
-TaskGroup::Data::Data () : numPending (0), isEmpty (1)
|
|
-{
|
|
- // empty
|
|
-}
|
|
+TaskGroup::Data::Data () : numPending (0), inFlight (0), isEmpty (1)
|
|
+{}
|
|
|
|
|
|
TaskGroup::Data::~Data ()
|
|
+{}
|
|
+
|
|
+void
|
|
+TaskGroup::Data::waitForEmpty ()
|
|
{
|
|
//
|
|
// A TaskGroup acts like an "inverted" semaphore: if the count
|
|
- // is above 0 then waiting on the taskgroup will block. This
|
|
+ // is above 0 then waiting on the taskgroup will block. The
|
|
// destructor waits until the taskgroup is empty before returning.
|
|
//
|
|
|
|
isEmpty.wait ();
|
|
|
|
-#ifdef ENABLE_SEM_DTOR_WORKAROUND
|
|
- // Update: this was fixed in v. 2.2.21, so this ifdef checks for that
|
|
- //
|
|
- // Alas, given the current bug in glibc we need a secondary
|
|
- // syncronisation primitive here to account for the fact that
|
|
- // destructing the isEmpty Semaphore in this thread can cause
|
|
- // an error for a separate thread that is issuing the post() call.
|
|
- // We are entitled to destruct the semaphore at this point, however,
|
|
- // that post() call attempts to access data out of the associated
|
|
- // memory *after* it has woken the waiting threads, including this one,
|
|
- // potentially leading to invalid memory reads.
|
|
- // http://sources.redhat.com/bugzilla/show_bug.cgi?id=12674
|
|
-
|
|
- std::lock_guard<std::mutex> lock (dtorMutex);
|
|
-#endif
|
|
+ // pseudo spin to wait for the notifying thread to finish the post
|
|
+ // to avoid a premature deletion of the semaphore
|
|
+ int count = 0;
|
|
+ while (inFlight.load () > 0)
|
|
+ {
|
|
+ ++count;
|
|
+ if (count > 100)
|
|
+ {
|
|
+ std::this_thread::yield ();
|
|
+ count = 0;
|
|
+ }
|
|
+ }
|
|
}
|
|
|
|
|
|
void
|
|
TaskGroup::Data::addTask ()
|
|
{
|
|
- //
|
|
- // in c++11, we use an atomic to protect numPending to avoid the
|
|
- // extra lock but for c++98, to add the ability for custom thread
|
|
- // pool we add the lock here
|
|
- //
|
|
- if (numPending++ == 0)
|
|
- isEmpty.wait ();
|
|
+ inFlight.fetch_add (1);
|
|
+
|
|
+ // if we are the first task off the rank, clear the
|
|
+ // isEmpty semaphore such that the group will actually pause
|
|
+ // until the task finishes
|
|
+ if (numPending.fetch_add (1) == 0) { isEmpty.wait (); }
|
|
}
|
|
|
|
|
|
void
|
|
TaskGroup::Data::removeTask ()
|
|
{
|
|
- // Alas, given the current bug in glibc we need a secondary
|
|
- // syncronisation primitive here to account for the fact that
|
|
- // destructing the isEmpty Semaphore in a separate thread can
|
|
- // cause an error. Issuing the post call here the current libc
|
|
- // implementation attempts to access memory *after* it has woken
|
|
- // waiting threads.
|
|
- // Since other threads are entitled to delete the semaphore the
|
|
- // access to the memory location can be invalid.
|
|
- // http://sources.redhat.com/bugzilla/show_bug.cgi?id=12674
|
|
- // Update: this bug has been fixed, but how do we know which
|
|
- // glibc version we're in?
|
|
-
|
|
- // Further update:
|
|
+ // if we are the last task, notify the group we're done
|
|
+ if (numPending.fetch_sub (1) == 1) { isEmpty.post (); }
|
|
+
|
|
+ // in theory, a background thread could actually finish a task
|
|
+ // prior to the next task being added. The fetch_add / fetch_sub
|
|
+ // logic between addTask and removeTask are fine to keep the
|
|
+ // inverted semaphore straight. All addTask must happen prior to
|
|
+ // the TaskGroup destructor.
|
|
//
|
|
- // we could remove this if it is a new enough glibc, however
|
|
- // we've changed the API to enable a custom override of a
|
|
- // thread pool. In order to provide safe access to the numPending,
|
|
- // we need the lock anyway, except for c++11 or newer
|
|
- if (--numPending == 0)
|
|
- {
|
|
-#ifdef ENABLE_SEM_DTOR_WORKAROUND
|
|
- std::lock_guard<std::mutex> lk (dtorMutex);
|
|
-#endif
|
|
- isEmpty.post ();
|
|
- }
|
|
+ // But to let the taskgroup thread waiting know we're actually
|
|
+ // finished with the last one and finished posting (the semaphore
|
|
+ // might wake up the other thread while in the middle of post) so
|
|
+ // we don't destroy the semaphore while posting to it, keep a
|
|
+ // separate counter that is modified pre / post semaphore
|
|
+ inFlight.fetch_sub (1);
|
|
}
|
|
|
|
|
|
@@ -491,8 +405,7 @@ TaskGroup::Data::removeTask ()
|
|
// struct ThreadPool::Data
|
|
//
|
|
|
|
-ThreadPool::Data::Data ():
|
|
- provUsers (0), provider (NULL)
|
|
+ThreadPool::Data::Data ()
|
|
{
|
|
// empty
|
|
}
|
|
@@ -500,82 +413,7 @@ ThreadPool::Data::Data ():
|
|
|
|
ThreadPool::Data::~Data()
|
|
{
|
|
- ThreadPoolProvider *p = provider.load( std::memory_order_relaxed );
|
|
- p->finish();
|
|
- delete p;
|
|
-}
|
|
-
|
|
-inline ThreadPool::Data::SafeProvider
|
|
-ThreadPool::Data::getProvider ()
|
|
-{
|
|
- provUsers.fetch_add( 1, std::memory_order_relaxed );
|
|
- return SafeProvider( this, provider.load( std::memory_order_relaxed ) );
|
|
-}
|
|
-
|
|
-
|
|
-inline void
|
|
-ThreadPool::Data::coalesceProviderUse ()
|
|
-{
|
|
- int ov = provUsers.fetch_sub( 1, std::memory_order_relaxed );
|
|
- // ov is the previous value, so one means that now it might be 0
|
|
- if ( ov == 1 )
|
|
- {
|
|
- // do we have anything to do here?
|
|
- }
|
|
-}
|
|
-
|
|
-
|
|
-inline void
|
|
-ThreadPool::Data::bumpProviderUse ()
|
|
-{
|
|
- provUsers.fetch_add( 1, std::memory_order_relaxed );
|
|
-}
|
|
-
|
|
-
|
|
-inline void
|
|
-ThreadPool::Data::setProvider (ThreadPoolProvider *p)
|
|
-{
|
|
- ThreadPoolProvider *old = provider.load( std::memory_order_relaxed );
|
|
- // work around older gcc bug just in case
|
|
- do
|
|
- {
|
|
- if ( ! provider.compare_exchange_weak( old, p, std::memory_order_release, std::memory_order_relaxed ) )
|
|
- continue;
|
|
- } while (false); // NOSONAR - suppress SonarCloud bug report.
|
|
-
|
|
- // wait for any other users to finish prior to deleting, given
|
|
- // that these are just mostly to query the thread count or push a
|
|
- // task to the queue (so fast), just spin...
|
|
- //
|
|
- // (well, and normally, people don't do this mid stream anyway, so
|
|
- // this will be 0 99.999% of the time, but just to be safe)
|
|
- //
|
|
- while ( provUsers.load( std::memory_order_relaxed ) > 0 )
|
|
- std::this_thread::yield();
|
|
-
|
|
- if ( old )
|
|
- {
|
|
- old->finish();
|
|
- delete old;
|
|
- }
|
|
-
|
|
- // NB: the shared_ptr mechanism is safer and means we don't have
|
|
- // to have the provUsers counter since the shared_ptr keeps that
|
|
- // for us. However, gcc 4.8/9 compilers which many people are
|
|
- // still using even though it is 2018 forgot to add the shared_ptr
|
|
- // functions... once that compiler is fully deprecated, switch to
|
|
- // using the below, change provider to a std::shared_ptr and remove
|
|
- // provUsers...
|
|
- //
|
|
-// std::shared_ptr<ThreadPoolProvider> newp( p );
|
|
-// std::shared_ptr<ThreadPoolProvider> curp = std::atomic_load_explicit( &provider, std::memory_order_relaxed );
|
|
-// do
|
|
-// {
|
|
-// if ( ! std::atomic_compare_exchange_weak_explicit( &provider, &curp, newp, std::memory_order_release, std::memory_order_relaxed ) )
|
|
-// continue;
|
|
-// } while ( false );
|
|
-// if ( curp )
|
|
-// curp->finish();
|
|
+ setProvider (nullptr);
|
|
}
|
|
|
|
#endif // ENABLE_THREADING
|
|
@@ -608,7 +446,7 @@ Task::group ()
|
|
|
|
TaskGroup::TaskGroup ():
|
|
#ifdef ENABLE_THREADING
|
|
- _data (new Data())
|
|
+ _data (new Data)
|
|
#else
|
|
_data (nullptr)
|
|
#endif
|
|
@@ -620,6 +458,7 @@ TaskGroup::TaskGroup ():
|
|
TaskGroup::~TaskGroup ()
|
|
{
|
|
#ifdef ENABLE_THREADING
|
|
+ _data->waitForEmpty ();
|
|
delete _data;
|
|
#endif
|
|
}
|
|
@@ -660,10 +499,7 @@ ThreadPool::ThreadPool (unsigned nthreads):
|
|
#endif
|
|
{
|
|
#ifdef ENABLE_THREADING
|
|
- if ( nthreads == 0 )
|
|
- _data->setProvider( new NullThreadPoolProvider );
|
|
- else
|
|
- _data->setProvider( new DefaultThreadPoolProvider( int(nthreads) ) );
|
|
+ setNumThreads (static_cast<int> (nthreads));
|
|
#endif
|
|
}
|
|
|
|
@@ -671,6 +507,8 @@ ThreadPool::ThreadPool (unsigned nthreads):
|
|
ThreadPool::~ThreadPool ()
|
|
{
|
|
#ifdef ENABLE_THREADING
|
|
+ // ensures any jobs / threads are finished & shutdown
|
|
+ _data->setProvider (nullptr);
|
|
delete _data;
|
|
#endif
|
|
}
|
|
@@ -680,7 +518,8 @@ int
|
|
ThreadPool::numThreads () const
|
|
{
|
|
#ifdef ENABLE_THREADING
|
|
- return _data->getProvider ()->numThreads ();
|
|
+ Data::ProviderPtr sp = _data->getProvider ();
|
|
+ return (sp) ? sp->numThreads () : 0;
|
|
#else
|
|
return 0;
|
|
#endif
|
|
@@ -695,36 +534,30 @@ ThreadPool::setNumThreads (int count)
|
|
throw IEX_INTERNAL_NAMESPACE::ArgExc ("Attempt to set the number of threads "
|
|
"in a thread pool to a negative value.");
|
|
|
|
- bool doReset = false;
|
|
{
|
|
- Data::SafeProvider sp = _data->getProvider ();
|
|
- int curT = sp->numThreads ();
|
|
- if ( curT == count )
|
|
- return;
|
|
-
|
|
- if ( curT == 0 )
|
|
+ Data::ProviderPtr sp = _data->getProvider ();
|
|
+ if (sp)
|
|
{
|
|
- NullThreadPoolProvider *npp = dynamic_cast<NullThreadPoolProvider *>( sp.get() );
|
|
- if ( npp )
|
|
- doReset = true;
|
|
- }
|
|
- else if ( count == 0 )
|
|
- {
|
|
- DefaultThreadPoolProvider *dpp = dynamic_cast<DefaultThreadPoolProvider *>( sp.get() );
|
|
- if ( dpp )
|
|
- doReset = true;
|
|
+ bool doReset = false;
|
|
+ int curT = sp->numThreads ();
|
|
+ if (curT == count) return;
|
|
+
|
|
+ if (count != 0)
|
|
+ {
|
|
+ sp->setNumThreads (count);
|
|
+ return;
|
|
+ }
|
|
}
|
|
- if ( ! doReset )
|
|
- sp->setNumThreads( count );
|
|
}
|
|
|
|
- if ( doReset )
|
|
- {
|
|
- if ( count == 0 )
|
|
- _data->setProvider( new NullThreadPoolProvider );
|
|
- else
|
|
- _data->setProvider( new DefaultThreadPoolProvider( count ) );
|
|
- }
|
|
+ // either a null provider or a case where we should switch from
|
|
+ // a default provider to a null one or vice-versa
|
|
+ if (count == 0)
|
|
+ _data->setProvider (nullptr);
|
|
+ else
|
|
+ _data->setProvider (
|
|
+ std::make_shared<DefaultThreadPoolProvider> (count));
|
|
+
|
|
#else
|
|
// just blindly ignore
|
|
(void)count;
|
|
@@ -736,7 +569,8 @@ void
|
|
ThreadPool::setThreadProvider (ThreadPoolProvider *provider)
|
|
{
|
|
#ifdef ENABLE_THREADING
|
|
- _data->setProvider (provider);
|
|
+ // contract is we take ownership and will free the provider
|
|
+ _data->setProvider (Data::ProviderPtr (provider));
|
|
#else
|
|
throw IEX_INTERNAL_NAMESPACE::ArgExc (
|
|
"Attempt to set a thread provider on a system with threads"
|
|
@@ -748,12 +582,19 @@ ThreadPool::setThreadProvider (ThreadPoolProvider *provider)
|
|
void
|
|
ThreadPool::addTask (Task* task)
|
|
{
|
|
+ if (task)
|
|
+ {
|
|
#ifdef ENABLE_THREADING
|
|
- _data->getProvider ()->addTask (task);
|
|
-#else
|
|
- task->execute ();
|
|
- delete task;
|
|
+ Data::ProviderPtr p = _data->getProvider ();
|
|
+ if (p)
|
|
+ {
|
|
+ p->addTask (task);
|
|
+ return;
|
|
+ }
|
|
#endif
|
|
+
|
|
+ handleProcessTask (task);
|
|
+ }
|
|
}
|
|
|
|
|
|
@@ -780,7 +621,24 @@ unsigned
|
|
ThreadPool::estimateThreadCountForFileIO ()
|
|
{
|
|
#ifdef ENABLE_THREADING
|
|
- return std::thread::hardware_concurrency ();
|
|
+ unsigned rv = std::thread::hardware_concurrency ();
|
|
+ // hardware concurrency is not required to work
|
|
+ if (rv == 0 ||
|
|
+ rv > static_cast<unsigned> (std::numeric_limits<int>::max ()))
|
|
+ {
|
|
+ rv = 1;
|
|
+# if (defined(_WIN32) || defined(_WIN64))
|
|
+ SYSTEM_INFO si;
|
|
+ GetNativeSystemInfo (&si);
|
|
+
|
|
+ rv = si.dwNumberOfProcessors;
|
|
+# else
|
|
+ // linux, bsd, and mac are fine with this
|
|
+ // other *nix should be too, right?
|
|
+ rv = sysconf (_SC_NPROCESSORS_ONLN);
|
|
+# endif
|
|
+ }
|
|
+ return rv;
|
|
#else
|
|
return 0;
|
|
#endif
|
|
diff --git a/src/lib/IlmThread/IlmThreadSemaphore.h b/src/lib/IlmThread/IlmThreadSemaphore.h
|
|
index f26e48a09..576968aa6 100644
|
|
--- a/src/lib/IlmThread/IlmThreadSemaphore.h
|
|
+++ b/src/lib/IlmThread/IlmThreadSemaphore.h
|
|
@@ -64,10 +64,10 @@ class ILMTHREAD_EXPORT_TYPE Semaphore
|
|
mutable HANDLE _semaphore;
|
|
|
|
#elif ILMTHREAD_THREADING_ENABLED
|
|
- //
|
|
- // If the platform has threads but no semapohores,
|
|
- // then we implement them ourselves using condition variables
|
|
- //
|
|
+ //
|
|
+ // If the platform has threads but no semaphores,
|
|
+ // then we implement them ourselves using condition variables
|
|
+ //
|
|
|
|
struct sema_t
|
|
{
|