From b18905772e9dd98658f8a37d16c6e53c7c17adaa Mon Sep 17 00:00:00 2001 From: Kimball Thurston Date: Sat, 22 Apr 2023 15:58:43 +1200 Subject: [PATCH] Change setNumThreads to wait for thread start (#1291) Fix Issue #890, issue with windows shutdown when exiting quickly prior to threads actually starting. We do this by having a data block that is passed by shared_ptr to the thread to avoid dereferencing a deleted object. Further, greatly simplify the ThreadPool code by using atomic shared_ptr functions instead of trying to manually implement something similar and otherwise modernizing the thread code. Fix a few potential null dereference locations and also fix an issue when systems are overloaded enabling the TaskGroup destructor to destroy the semaphore while code is still using it, causing undefined memory corruption if some other thread immediately allocates that same block Originally based on a proposed patch by Dieter De Baets @debaetsd Signed-off-by: Kimball Thurston --- src/lib/IlmThread/IlmThreadPool.cpp | 740 ++++++++++--------------- src/lib/IlmThread/IlmThreadSemaphore.h | 8 +- 2 files changed, 303 insertions(+), 445 deletions(-) diff --git a/src/lib/IlmThread/IlmThreadPool.cpp b/src/lib/IlmThread/IlmThreadPool.cpp index 0ddcf8d52..f4578a510 100644 --- a/src/lib/IlmThread/IlmThreadPool.cpp +++ b/src/lib/IlmThread/IlmThreadPool.cpp @@ -15,12 +15,17 @@ #include "Iex.h" #include +#include #include #include #include #include -using namespace std; +#if (defined(_WIN32) || defined(_WIN64)) +# include +#else +# include +#endif ILMTHREAD_INTERNAL_NAMESPACE_SOURCE_ENTER @@ -28,380 +33,301 @@ ILMTHREAD_INTERNAL_NAMESPACE_SOURCE_ENTER # define ENABLE_THREADING #endif -#if defined(__GNU_LIBRARY__) && ( __GLIBC__ < 2 || ( __GLIBC__ == 2 && __GLIBC_MINOR__ < 21 ) ) -# define ENABLE_SEM_DTOR_WORKAROUND -#endif - -#ifdef ENABLE_THREADING - -struct TaskGroup::Data +namespace { - Data (); - ~Data (); - - void addTask () ; - void removeTask (); - std::atomic numPending; - Semaphore isEmpty; // used to signal that the taskgroup is empty -#if defined(ENABLE_SEM_DTOR_WORKAROUND) - // this mutex is also used to lock numPending in the legacy c++ mode... - std::mutex dtorMutex; // used to work around the glibc bug: - // http://sources.redhat.com/bugzilla/show_bug.cgi?id=12674 -#endif -}; - -struct ThreadPool::Data +static inline void +handleProcessTask (Task* task) { - typedef ThreadPoolProvider *TPPointer; - - Data (); - ~Data(); - Data (const Data&) = delete; - Data &operator= (const Data&) = delete; - Data (Data&&) = delete; - Data &operator= (Data&&) = delete; - - struct SafeProvider + if (task) { - SafeProvider (Data *d, ThreadPoolProvider *p) : _data( d ), _ptr( p ) - { - } - - ~SafeProvider() - { - if ( _data ) - _data->coalesceProviderUse(); - } - SafeProvider (const SafeProvider &o) - : _data( o._data ), _ptr( o._ptr ) - { - if ( _data ) - _data->bumpProviderUse(); - } - SafeProvider &operator= (const SafeProvider &o) - { - if ( this != &o ) - { - if ( o._data ) - o._data->bumpProviderUse(); - if ( _data ) - _data->coalesceProviderUse(); - _data = o._data; - _ptr = o._ptr; - } - return *this; - } - SafeProvider( SafeProvider &&o ) - : _data( o._data ), _ptr( o._ptr ) - { - o._data = nullptr; - } - SafeProvider &operator=( SafeProvider &&o ) - { - std::swap( _data, o._data ); - std::swap( _ptr, o._ptr ); - return *this; - } - - inline ThreadPoolProvider *get () const - { - return _ptr; - } - ThreadPoolProvider *operator-> () const - { - return get(); - } - - Data *_data; - ThreadPoolProvider *_ptr; - }; - - // NB: In C++20, there is full support for atomic shared_ptr, but that is not - // yet in use or finalized. Once stabilized, add appropriate usage here - inline SafeProvider getProvider (); - inline void coalesceProviderUse (); - inline void bumpProviderUse (); - inline void setProvider (ThreadPoolProvider *p); - - std::atomic provUsers; - std::atomic provider; -}; - + TaskGroup* taskGroup = task->group (); + task->execute (); -namespace { + // kill the task prior to notifying the group + // such that any internal reference-based + // semantics will be handled prior to + // the task group destructor letting it out + // of the scope of those references + delete task; -class DefaultWorkerThread; + if (taskGroup) taskGroup->finishOneTask (); + } +} -struct DefaultWorkData +#ifdef ENABLE_THREADING +struct DefaultThreadPoolData { - Semaphore taskSemaphore; // threads wait on this for ready tasks - mutable std::mutex taskMutex; // mutual exclusion for the tasks list - vector tasks; // the list of tasks to execute + Semaphore _taskSemaphore; // threads wait on this for ready tasks + mutable std::mutex _taskMutex; // mutual exclusion for the tasks list + std::vector _tasks; // the list of tasks to execute - Semaphore threadSemaphore; // signaled when a thread starts executing - mutable std::mutex threadMutex; // mutual exclusion for threads list - vector threads; // the list of all threads - - std::atomic hasThreads; - std::atomic stopping; + mutable std::mutex _threadMutex; // mutual exclusion for threads list + std::vector _threads; // the list of all threads + + std::atomic _threadCount; + std::atomic _stopping; inline bool stopped () const { - return stopping.load( std::memory_order_relaxed ); + return _stopping.load (std::memory_order_relaxed); } - inline void stop () + inline void stop () { _stopping = true; } + + inline void resetAtomics () { - stopping = true; + _threadCount = 0; + _stopping = false; } }; +#endif -// -// class WorkerThread -// -class DefaultWorkerThread: public Thread +} // namespace + +#ifdef ENABLE_THREADING + +struct TaskGroup::Data { - public: + Data (); + ~Data (); + Data (const Data&) = delete; + Data& operator= (const Data&) = delete; + Data (Data&&) = delete; + Data& operator= (Data&&) = delete; - DefaultWorkerThread (DefaultWorkData* data); + void addTask (); + void removeTask (); - virtual void run (); - - private: + void waitForEmpty (); - DefaultWorkData * _data; + std::atomic numPending; + std::atomic inFlight; + Semaphore isEmpty; // used to signal that the taskgroup is empty }; -DefaultWorkerThread::DefaultWorkerThread (DefaultWorkData* data): - _data (data) +struct ThreadPool::Data { - start(); -} - + using ProviderPtr = std::shared_ptr; -void -DefaultWorkerThread::run () -{ - // - // Signal that the thread has started executing - // + Data (); + ~Data (); + Data (const Data&) = delete; + Data& operator= (const Data&) = delete; + Data (Data&&) = delete; + Data& operator= (Data&&) = delete; - _data->threadSemaphore.post(); + ProviderPtr getProvider () const { return std::atomic_load (&_provider); } - while (true) + void setProvider (ProviderPtr provider) { - // - // Wait for a task to become available - // - - _data->taskSemaphore.wait(); - - { - std::unique_lock taskLock (_data->taskMutex); - - // - // If there is a task pending, pop off the next task in the FIFO - // + ProviderPtr curp = std::atomic_exchange (&_provider, provider); + if (curp && curp != provider) curp->finish (); + } - if (!_data->tasks.empty()) - { - Task* task = _data->tasks.back(); - _data->tasks.pop_back(); - // release the mutex while we process - taskLock.unlock(); + std::shared_ptr _provider; +}; - TaskGroup* taskGroup = task->group(); - task->execute(); - delete task; - - taskGroup->_data->removeTask (); - } - else if (_data->stopped()) - { - break; - } - } - } -} +namespace { // // class DefaultThreadPoolProvider // class DefaultThreadPoolProvider : public ThreadPoolProvider { - public: - DefaultThreadPoolProvider(int count); - virtual ~DefaultThreadPoolProvider(); +public: + DefaultThreadPoolProvider (int count); + DefaultThreadPoolProvider (const DefaultThreadPoolProvider&) = delete; + DefaultThreadPoolProvider& + operator= (const DefaultThreadPoolProvider&) = delete; + DefaultThreadPoolProvider (DefaultThreadPoolProvider&&) = delete; + DefaultThreadPoolProvider& operator= (DefaultThreadPoolProvider&&) = delete; + ~DefaultThreadPoolProvider () override; + + int numThreads () const override; + void setNumThreads (int count) override; + void addTask (Task* task) override; - virtual int numThreads() const; - virtual void setNumThreads(int count); - virtual void addTask(Task *task); + void finish () override; - virtual void finish(); +private: + void lockedFinish (); + void threadLoop (std::shared_ptr d); - private: - DefaultWorkData _data; + std::shared_ptr _data; }; DefaultThreadPoolProvider::DefaultThreadPoolProvider (int count) + : _data (std::make_shared ()) { - setNumThreads(count); + _data->resetAtomics (); + setNumThreads (count); } DefaultThreadPoolProvider::~DefaultThreadPoolProvider () -{ - finish(); -} +{} int DefaultThreadPoolProvider::numThreads () const { - std::lock_guard lock (_data.threadMutex); - return static_cast (_data.threads.size()); + return _data->_threadCount.load (); } void DefaultThreadPoolProvider::setNumThreads (int count) { - // - // Lock access to thread list and size - // + // since we're a private class, the thread pool won't call us if + // we aren't changing size so no need to check that... - std::lock_guard lock (_data.threadMutex); + std::lock_guard lock (_data->_threadMutex); - size_t desired = static_cast(count); - if (desired > _data.threads.size()) - { - // - // Add more threads - // + size_t curThreads = _data->_threads.size (); + size_t nToAdd = static_cast (count); - while (_data.threads.size() < desired) - _data.threads.push_back (new DefaultWorkerThread (&_data)); - } - else if ((size_t)count < _data.threads.size()) + if (nToAdd < curThreads) { - // - // Wait until all existing threads are finished processing, - // then delete all threads. - // - finish (); - - // - // Add in new threads - // - - while (_data.threads.size() < desired) - _data.threads.push_back (new DefaultWorkerThread (&_data)); + // no easy way to only shutdown the n threads at the end of + // the vector (well, really, guaranteeing they are the ones to + // be woken up), so just kill all of the threads + lockedFinish (); + curThreads = 0; } - _data.hasThreads = !(_data.threads.empty()); + _data->_threads.resize (nToAdd); + for (size_t i = curThreads; i < nToAdd; ++i) + { + _data->_threads[i] = + std::thread (&DefaultThreadPoolProvider::threadLoop, this, _data); + } + _data->_threadCount = static_cast (_data->_threads.size ()); } void DefaultThreadPoolProvider::addTask (Task *task) { - // - // Lock the threads, needed to access numThreads - // - bool doPush = _data.hasThreads.load( std::memory_order_relaxed ); - - if ( doPush ) + // the thread pool will kill us and switch to a null provider + // if the thread count is set to 0, so we can always + // go ahead and lock and assume we have a thread to do the + // processing { - // - // Get exclusive access to the tasks queue - // + std::lock_guard taskLock (_data->_taskMutex); - { - std::lock_guard taskLock (_data.taskMutex); - - // - // Push the new task into the FIFO - // - _data.tasks.push_back (task); - } - // - // Signal that we have a new task to process + // Push the new task into the FIFO // - _data.taskSemaphore.post (); - } - else - { - // this path shouldn't normally happen since we have the - // NullThreadPoolProvider, but just in case... - task->execute (); - task->group()->_data->removeTask (); - delete task; + _data->_tasks.push_back (task); } + + // + // Signal that we have a new task to process + // + _data->_taskSemaphore.post (); } void DefaultThreadPoolProvider::finish () { - _data.stop(); + std::lock_guard lock (_data->_threadMutex); + + lockedFinish (); +} + +void +DefaultThreadPoolProvider::lockedFinish () +{ + _data->stop (); // // Signal enough times to allow all threads to stop. // - // Wait until all threads have started their run functions. - // If we do not wait before we destroy the threads then it's - // possible that the threads have not yet called their run - // functions. - // If this happens then the run function will be called off - // of an invalid object and we will crash, most likely with - // an error like: "pure virtual method called" + // NB: we must do this as many times as we have threads. // - - size_t curT = _data.threads.size(); + // If there is still work in the queue, or this call happens "too + // quickly", threads will not be waiting on the semaphore, so we + // need to ensure the semaphore is at a count equal to the amount + // of work left plus the number of threads to ensure exit of a + // thread. There can be threads in a few states: + // - still starting up (successive calls to setNumThreads) + // - in the middle of processing a task / looping + // - waiting in the semaphore + size_t curT = _data->_threads.size (); for (size_t i = 0; i != curT; ++i) - { - if (_data.threads[i]->joinable()) - { - _data.taskSemaphore.post(); - _data.threadSemaphore.wait(); - } - } + _data->_taskSemaphore.post (); // - // Join all the threads + // We should not need to check joinability, they should all, by + // definition, be joinable (assuming normal start) // for (size_t i = 0; i != curT; ++i) { - if (_data.threads[i]->joinable()) - _data.threads[i]->join(); - delete _data.threads[i]; + // This isn't quite right in that the thread may have actually + // be in an exited / signalled state (needing the + // WaitForSingleObject call), and so already have an exit code + // (I think, but the docs are vague), but if we don't do the + // join, the stl thread seems to then throw an exception. The + // join should just return invalid handle and continue, and is + // more of a windows bug... except maybe someone needs to work + // around it... + //# ifdef TEST_FOR_WIN_THREAD_STATUS + // + // // per OIIO issue #2038, on exit / dll unload, windows may + // // kill the thread, double check that it is still active prior + // // to joining. + // DWORD tstatus; + // if (GetExitCodeThread (_threads[i].native_handle (), &tstatus)) + // { + // if (tstatus != STILL_ACTIVE) { continue; } + // } + //# endif + + _data->_threads[i].join (); } - std::lock_guard lk( _data.taskMutex ); - - _data.threads.clear(); - _data.tasks.clear(); + _data->_threads.clear (); - _data.stopping = false; + _data->resetAtomics (); } - -class NullThreadPoolProvider : public ThreadPoolProvider +void +DefaultThreadPoolProvider::threadLoop ( + std::shared_ptr data) { - virtual ~NullThreadPoolProvider() {} - virtual int numThreads () const { return 0; } - virtual void setNumThreads (int count) - { - } - virtual void addTask (Task *t) + while (true) { - t->execute (); - t->group()->_data->removeTask (); - delete t; + // + // Wait for a task to become available + // + + data->_taskSemaphore.wait (); + + { + std::unique_lock taskLock (data->_taskMutex); + + // + // If there is a task pending, pop off the next task in the FIFO + // + + if (!data->_tasks.empty ()) + { + Task* task = data->_tasks.back (); + data->_tasks.pop_back (); + + // release the mutex while we process + taskLock.unlock (); + + handleProcessTask (task); + + // do not need to reacquire the lock at all since we + // will just loop around, pull any other task + } + else if (data->stopped ()) { break; } + } } - virtual void finish () {} -}; +} } //namespace @@ -409,81 +335,69 @@ class NullThreadPoolProvider : public ThreadPoolProvider // struct TaskGroup::Data // -TaskGroup::Data::Data () : numPending (0), isEmpty (1) -{ - // empty -} +TaskGroup::Data::Data () : numPending (0), inFlight (0), isEmpty (1) +{} TaskGroup::Data::~Data () +{} + +void +TaskGroup::Data::waitForEmpty () { // // A TaskGroup acts like an "inverted" semaphore: if the count - // is above 0 then waiting on the taskgroup will block. This + // is above 0 then waiting on the taskgroup will block. The // destructor waits until the taskgroup is empty before returning. // isEmpty.wait (); -#ifdef ENABLE_SEM_DTOR_WORKAROUND - // Update: this was fixed in v. 2.2.21, so this ifdef checks for that - // - // Alas, given the current bug in glibc we need a secondary - // syncronisation primitive here to account for the fact that - // destructing the isEmpty Semaphore in this thread can cause - // an error for a separate thread that is issuing the post() call. - // We are entitled to destruct the semaphore at this point, however, - // that post() call attempts to access data out of the associated - // memory *after* it has woken the waiting threads, including this one, - // potentially leading to invalid memory reads. - // http://sources.redhat.com/bugzilla/show_bug.cgi?id=12674 - - std::lock_guard lock (dtorMutex); -#endif + // pseudo spin to wait for the notifying thread to finish the post + // to avoid a premature deletion of the semaphore + int count = 0; + while (inFlight.load () > 0) + { + ++count; + if (count > 100) + { + std::this_thread::yield (); + count = 0; + } + } } void TaskGroup::Data::addTask () { - // - // in c++11, we use an atomic to protect numPending to avoid the - // extra lock but for c++98, to add the ability for custom thread - // pool we add the lock here - // - if (numPending++ == 0) - isEmpty.wait (); + inFlight.fetch_add (1); + + // if we are the first task off the rank, clear the + // isEmpty semaphore such that the group will actually pause + // until the task finishes + if (numPending.fetch_add (1) == 0) { isEmpty.wait (); } } void TaskGroup::Data::removeTask () { - // Alas, given the current bug in glibc we need a secondary - // syncronisation primitive here to account for the fact that - // destructing the isEmpty Semaphore in a separate thread can - // cause an error. Issuing the post call here the current libc - // implementation attempts to access memory *after* it has woken - // waiting threads. - // Since other threads are entitled to delete the semaphore the - // access to the memory location can be invalid. - // http://sources.redhat.com/bugzilla/show_bug.cgi?id=12674 - // Update: this bug has been fixed, but how do we know which - // glibc version we're in? - - // Further update: + // if we are the last task, notify the group we're done + if (numPending.fetch_sub (1) == 1) { isEmpty.post (); } + + // in theory, a background thread could actually finish a task + // prior to the next task being added. The fetch_add / fetch_sub + // logic between addTask and removeTask are fine to keep the + // inverted semaphore straight. All addTask must happen prior to + // the TaskGroup destructor. // - // we could remove this if it is a new enough glibc, however - // we've changed the API to enable a custom override of a - // thread pool. In order to provide safe access to the numPending, - // we need the lock anyway, except for c++11 or newer - if (--numPending == 0) - { -#ifdef ENABLE_SEM_DTOR_WORKAROUND - std::lock_guard lk (dtorMutex); -#endif - isEmpty.post (); - } + // But to let the taskgroup thread waiting know we're actually + // finished with the last one and finished posting (the semaphore + // might wake up the other thread while in the middle of post) so + // we don't destroy the semaphore while posting to it, keep a + // separate counter that is modified pre / post semaphore + inFlight.fetch_sub (1); } @@ -491,8 +405,7 @@ TaskGroup::Data::removeTask () // struct ThreadPool::Data // -ThreadPool::Data::Data (): - provUsers (0), provider (NULL) +ThreadPool::Data::Data () { // empty } @@ -500,82 +413,7 @@ ThreadPool::Data::Data (): ThreadPool::Data::~Data() { - ThreadPoolProvider *p = provider.load( std::memory_order_relaxed ); - p->finish(); - delete p; -} - -inline ThreadPool::Data::SafeProvider -ThreadPool::Data::getProvider () -{ - provUsers.fetch_add( 1, std::memory_order_relaxed ); - return SafeProvider( this, provider.load( std::memory_order_relaxed ) ); -} - - -inline void -ThreadPool::Data::coalesceProviderUse () -{ - int ov = provUsers.fetch_sub( 1, std::memory_order_relaxed ); - // ov is the previous value, so one means that now it might be 0 - if ( ov == 1 ) - { - // do we have anything to do here? - } -} - - -inline void -ThreadPool::Data::bumpProviderUse () -{ - provUsers.fetch_add( 1, std::memory_order_relaxed ); -} - - -inline void -ThreadPool::Data::setProvider (ThreadPoolProvider *p) -{ - ThreadPoolProvider *old = provider.load( std::memory_order_relaxed ); - // work around older gcc bug just in case - do - { - if ( ! provider.compare_exchange_weak( old, p, std::memory_order_release, std::memory_order_relaxed ) ) - continue; - } while (false); // NOSONAR - suppress SonarCloud bug report. - - // wait for any other users to finish prior to deleting, given - // that these are just mostly to query the thread count or push a - // task to the queue (so fast), just spin... - // - // (well, and normally, people don't do this mid stream anyway, so - // this will be 0 99.999% of the time, but just to be safe) - // - while ( provUsers.load( std::memory_order_relaxed ) > 0 ) - std::this_thread::yield(); - - if ( old ) - { - old->finish(); - delete old; - } - - // NB: the shared_ptr mechanism is safer and means we don't have - // to have the provUsers counter since the shared_ptr keeps that - // for us. However, gcc 4.8/9 compilers which many people are - // still using even though it is 2018 forgot to add the shared_ptr - // functions... once that compiler is fully deprecated, switch to - // using the below, change provider to a std::shared_ptr and remove - // provUsers... - // -// std::shared_ptr newp( p ); -// std::shared_ptr curp = std::atomic_load_explicit( &provider, std::memory_order_relaxed ); -// do -// { -// if ( ! std::atomic_compare_exchange_weak_explicit( &provider, &curp, newp, std::memory_order_release, std::memory_order_relaxed ) ) -// continue; -// } while ( false ); -// if ( curp ) -// curp->finish(); + setProvider (nullptr); } #endif // ENABLE_THREADING @@ -608,7 +446,7 @@ Task::group () TaskGroup::TaskGroup (): #ifdef ENABLE_THREADING - _data (new Data()) + _data (new Data) #else _data (nullptr) #endif @@ -620,6 +458,7 @@ TaskGroup::TaskGroup (): TaskGroup::~TaskGroup () { #ifdef ENABLE_THREADING + _data->waitForEmpty (); delete _data; #endif } @@ -660,10 +499,7 @@ ThreadPool::ThreadPool (unsigned nthreads): #endif { #ifdef ENABLE_THREADING - if ( nthreads == 0 ) - _data->setProvider( new NullThreadPoolProvider ); - else - _data->setProvider( new DefaultThreadPoolProvider( int(nthreads) ) ); + setNumThreads (static_cast (nthreads)); #endif } @@ -671,6 +507,8 @@ ThreadPool::ThreadPool (unsigned nthreads): ThreadPool::~ThreadPool () { #ifdef ENABLE_THREADING + // ensures any jobs / threads are finished & shutdown + _data->setProvider (nullptr); delete _data; #endif } @@ -680,7 +518,8 @@ int ThreadPool::numThreads () const { #ifdef ENABLE_THREADING - return _data->getProvider ()->numThreads (); + Data::ProviderPtr sp = _data->getProvider (); + return (sp) ? sp->numThreads () : 0; #else return 0; #endif @@ -695,36 +534,30 @@ ThreadPool::setNumThreads (int count) throw IEX_INTERNAL_NAMESPACE::ArgExc ("Attempt to set the number of threads " "in a thread pool to a negative value."); - bool doReset = false; { - Data::SafeProvider sp = _data->getProvider (); - int curT = sp->numThreads (); - if ( curT == count ) - return; - - if ( curT == 0 ) + Data::ProviderPtr sp = _data->getProvider (); + if (sp) { - NullThreadPoolProvider *npp = dynamic_cast( sp.get() ); - if ( npp ) - doReset = true; - } - else if ( count == 0 ) - { - DefaultThreadPoolProvider *dpp = dynamic_cast( sp.get() ); - if ( dpp ) - doReset = true; + bool doReset = false; + int curT = sp->numThreads (); + if (curT == count) return; + + if (count != 0) + { + sp->setNumThreads (count); + return; + } } - if ( ! doReset ) - sp->setNumThreads( count ); } - if ( doReset ) - { - if ( count == 0 ) - _data->setProvider( new NullThreadPoolProvider ); - else - _data->setProvider( new DefaultThreadPoolProvider( count ) ); - } + // either a null provider or a case where we should switch from + // a default provider to a null one or vice-versa + if (count == 0) + _data->setProvider (nullptr); + else + _data->setProvider ( + std::make_shared (count)); + #else // just blindly ignore (void)count; @@ -736,7 +569,8 @@ void ThreadPool::setThreadProvider (ThreadPoolProvider *provider) { #ifdef ENABLE_THREADING - _data->setProvider (provider); + // contract is we take ownership and will free the provider + _data->setProvider (Data::ProviderPtr (provider)); #else throw IEX_INTERNAL_NAMESPACE::ArgExc ( "Attempt to set a thread provider on a system with threads" @@ -748,12 +582,19 @@ ThreadPool::setThreadProvider (ThreadPoolProvider *provider) void ThreadPool::addTask (Task* task) { + if (task) + { #ifdef ENABLE_THREADING - _data->getProvider ()->addTask (task); -#else - task->execute (); - delete task; + Data::ProviderPtr p = _data->getProvider (); + if (p) + { + p->addTask (task); + return; + } #endif + + handleProcessTask (task); + } } @@ -780,7 +621,24 @@ unsigned ThreadPool::estimateThreadCountForFileIO () { #ifdef ENABLE_THREADING - return std::thread::hardware_concurrency (); + unsigned rv = std::thread::hardware_concurrency (); + // hardware concurrency is not required to work + if (rv == 0 || + rv > static_cast (std::numeric_limits::max ())) + { + rv = 1; +# if (defined(_WIN32) || defined(_WIN64)) + SYSTEM_INFO si; + GetNativeSystemInfo (&si); + + rv = si.dwNumberOfProcessors; +# else + // linux, bsd, and mac are fine with this + // other *nix should be too, right? + rv = sysconf (_SC_NPROCESSORS_ONLN); +# endif + } + return rv; #else return 0; #endif diff --git a/src/lib/IlmThread/IlmThreadSemaphore.h b/src/lib/IlmThread/IlmThreadSemaphore.h index f26e48a09..576968aa6 100644 --- a/src/lib/IlmThread/IlmThreadSemaphore.h +++ b/src/lib/IlmThread/IlmThreadSemaphore.h @@ -64,10 +64,10 @@ class ILMTHREAD_EXPORT_TYPE Semaphore mutable HANDLE _semaphore; #elif ILMTHREAD_THREADING_ENABLED - // - // If the platform has threads but no semapohores, - // then we implement them ourselves using condition variables - // + // + // If the platform has threads but no semaphores, + // then we implement them ourselves using condition variables + // struct sema_t {