diff options
author | orivej <orivej@yandex-team.ru> | 2022-02-10 16:44:49 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:44:49 +0300 |
commit | 718c552901d703c502ccbefdfc3c9028d608b947 (patch) | |
tree | 46534a98bbefcd7b1f3faa5b52c138ab27db75b7 /contrib/libs/poco/Foundation/src/ThreadPool.cpp | |
parent | e9656aae26e0358d5378e5b63dcac5c8dbe0e4d0 (diff) | |
download | ydb-718c552901d703c502ccbefdfc3c9028d608b947.tar.gz |
Restoring authorship annotation for <orivej@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'contrib/libs/poco/Foundation/src/ThreadPool.cpp')
-rw-r--r-- | contrib/libs/poco/Foundation/src/ThreadPool.cpp | 1052 |
1 files changed, 526 insertions, 526 deletions
diff --git a/contrib/libs/poco/Foundation/src/ThreadPool.cpp b/contrib/libs/poco/Foundation/src/ThreadPool.cpp index 47716ce19f..5a03589cb2 100644 --- a/contrib/libs/poco/Foundation/src/ThreadPool.cpp +++ b/contrib/libs/poco/Foundation/src/ThreadPool.cpp @@ -1,527 +1,527 @@ -// -// ThreadPool.cpp -// -// Library: Foundation -// Package: Threading -// Module: ThreadPool -// -// Copyright (c) 2004-2006, Applied Informatics Software Engineering GmbH. -// and Contributors. -// -// SPDX-License-Identifier: BSL-1.0 -// - - -#include "Poco/ThreadPool.h" -#include "Poco/Runnable.h" -#include "Poco/Thread.h" -#include "Poco/Event.h" -#include "Poco/ThreadLocal.h" -#include "Poco/ErrorHandler.h" -#include <sstream> -#include <ctime> -#if defined(_WIN32_WCE) && _WIN32_WCE < 0x800 -#error #include "wce_time.h" -#endif - - -namespace Poco { - - -class PooledThread: public Runnable -{ -public: - PooledThread(const std::string& name, int stackSize = POCO_THREAD_STACK_SIZE); - ~PooledThread(); - - void start(); - void start(Thread::Priority priority, Runnable& target); - void start(Thread::Priority priority, Runnable& target, const std::string& name); - bool idle(); - int idleTime(); - void join(); - void activate(); - void release(); - void run(); - -private: - volatile bool _idle; - volatile std::time_t _idleTime; - Runnable* _pTarget; - std::string _name; - Thread _thread; - Event _targetReady; - Event _targetCompleted; - Event _started; - FastMutex _mutex; -}; - - -PooledThread::PooledThread(const std::string& name, int stackSize): - _idle(true), - _idleTime(0), - _pTarget(0), - _name(name), - _thread(name), - _targetCompleted(false) -{ - poco_assert_dbg (stackSize >= 0); - _thread.setStackSize(stackSize); -#if defined(_WIN32_WCE) && _WIN32_WCE < 0x800 - _idleTime = wceex_time(NULL); -#else - _idleTime = std::time(NULL); -#endif -} - - -PooledThread::~PooledThread() -{ -} - - -void PooledThread::start() -{ - _thread.start(*this); - _started.wait(); -} - - -void PooledThread::start(Thread::Priority priority, Runnable& target) -{ - FastMutex::ScopedLock lock(_mutex); - - poco_assert (_pTarget == 0); - - _pTarget = ⌖ - _thread.setPriority(priority); - _targetReady.set(); -} - - -void PooledThread::start(Thread::Priority priority, Runnable& target, const std::string& name) -{ - FastMutex::ScopedLock lock(_mutex); - - std::string fullName(name); - if (name.empty()) - { - fullName = _name; - } - else - { - fullName.append(" ("); - fullName.append(_name); - fullName.append(")"); - } - _thread.setName(fullName); - _thread.setPriority(priority); - - poco_assert (_pTarget == 0); - - _pTarget = ⌖ - _targetReady.set(); -} - - -inline bool PooledThread::idle() -{ - FastMutex::ScopedLock lock(_mutex); - return _idle; -} - - -int PooledThread::idleTime() -{ - FastMutex::ScopedLock lock(_mutex); - -#if defined(_WIN32_WCE) && _WIN32_WCE < 0x800 - return (int) (wceex_time(NULL) - _idleTime); -#else - return (int) (time(NULL) - _idleTime); -#endif -} - - -void PooledThread::join() -{ - _mutex.lock(); - Runnable* pTarget = _pTarget; - _mutex.unlock(); - if (pTarget) - _targetCompleted.wait(); -} - - -void PooledThread::activate() -{ - FastMutex::ScopedLock lock(_mutex); - - poco_assert (_idle); - _idle = false; - _targetCompleted.reset(); -} - - -void PooledThread::release() -{ - const long JOIN_TIMEOUT = 10000; - - _mutex.lock(); - _pTarget = 0; - _mutex.unlock(); - // In case of a statically allocated thread pool (such - // as the default thread pool), Windows may have already - // terminated the thread before we got here. - if (_thread.isRunning()) - _targetReady.set(); - - if (_thread.tryJoin(JOIN_TIMEOUT)) - { - delete this; - } -} - - -void PooledThread::run() -{ - _started.set(); - for (;;) - { - _targetReady.wait(); - _mutex.lock(); - if (_pTarget) // a NULL target means kill yourself - { - Runnable* pTarget = _pTarget; - _mutex.unlock(); - try - { - pTarget->run(); - } - catch (Exception& exc) - { - ErrorHandler::handle(exc); - } - catch (std::exception& exc) - { - ErrorHandler::handle(exc); - } - catch (...) - { - ErrorHandler::handle(); - } - FastMutex::ScopedLock lock(_mutex); - _pTarget = 0; -#if defined(_WIN32_WCE) && _WIN32_WCE < 0x800 - _idleTime = wceex_time(NULL); -#else - _idleTime = time(NULL); -#endif - _idle = true; - _targetCompleted.set(); - ThreadLocalStorage::clear(); - _thread.setName(_name); - _thread.setPriority(Thread::PRIO_NORMAL); - } - else - { - _mutex.unlock(); - break; - } - } -} - - -ThreadPool::ThreadPool(int minCapacity, - int maxCapacity, - int idleTime, +// +// ThreadPool.cpp +// +// Library: Foundation +// Package: Threading +// Module: ThreadPool +// +// Copyright (c) 2004-2006, Applied Informatics Software Engineering GmbH. +// and Contributors. +// +// SPDX-License-Identifier: BSL-1.0 +// + + +#include "Poco/ThreadPool.h" +#include "Poco/Runnable.h" +#include "Poco/Thread.h" +#include "Poco/Event.h" +#include "Poco/ThreadLocal.h" +#include "Poco/ErrorHandler.h" +#include <sstream> +#include <ctime> +#if defined(_WIN32_WCE) && _WIN32_WCE < 0x800 +#error #include "wce_time.h" +#endif + + +namespace Poco { + + +class PooledThread: public Runnable +{ +public: + PooledThread(const std::string& name, int stackSize = POCO_THREAD_STACK_SIZE); + ~PooledThread(); + + void start(); + void start(Thread::Priority priority, Runnable& target); + void start(Thread::Priority priority, Runnable& target, const std::string& name); + bool idle(); + int idleTime(); + void join(); + void activate(); + void release(); + void run(); + +private: + volatile bool _idle; + volatile std::time_t _idleTime; + Runnable* _pTarget; + std::string _name; + Thread _thread; + Event _targetReady; + Event _targetCompleted; + Event _started; + FastMutex _mutex; +}; + + +PooledThread::PooledThread(const std::string& name, int stackSize): + _idle(true), + _idleTime(0), + _pTarget(0), + _name(name), + _thread(name), + _targetCompleted(false) +{ + poco_assert_dbg (stackSize >= 0); + _thread.setStackSize(stackSize); +#if defined(_WIN32_WCE) && _WIN32_WCE < 0x800 + _idleTime = wceex_time(NULL); +#else + _idleTime = std::time(NULL); +#endif +} + + +PooledThread::~PooledThread() +{ +} + + +void PooledThread::start() +{ + _thread.start(*this); + _started.wait(); +} + + +void PooledThread::start(Thread::Priority priority, Runnable& target) +{ + FastMutex::ScopedLock lock(_mutex); + + poco_assert (_pTarget == 0); + + _pTarget = ⌖ + _thread.setPriority(priority); + _targetReady.set(); +} + + +void PooledThread::start(Thread::Priority priority, Runnable& target, const std::string& name) +{ + FastMutex::ScopedLock lock(_mutex); + + std::string fullName(name); + if (name.empty()) + { + fullName = _name; + } + else + { + fullName.append(" ("); + fullName.append(_name); + fullName.append(")"); + } + _thread.setName(fullName); + _thread.setPriority(priority); + + poco_assert (_pTarget == 0); + + _pTarget = ⌖ + _targetReady.set(); +} + + +inline bool PooledThread::idle() +{ + FastMutex::ScopedLock lock(_mutex); + return _idle; +} + + +int PooledThread::idleTime() +{ + FastMutex::ScopedLock lock(_mutex); + +#if defined(_WIN32_WCE) && _WIN32_WCE < 0x800 + return (int) (wceex_time(NULL) - _idleTime); +#else + return (int) (time(NULL) - _idleTime); +#endif +} + + +void PooledThread::join() +{ + _mutex.lock(); + Runnable* pTarget = _pTarget; + _mutex.unlock(); + if (pTarget) + _targetCompleted.wait(); +} + + +void PooledThread::activate() +{ + FastMutex::ScopedLock lock(_mutex); + + poco_assert (_idle); + _idle = false; + _targetCompleted.reset(); +} + + +void PooledThread::release() +{ + const long JOIN_TIMEOUT = 10000; + + _mutex.lock(); + _pTarget = 0; + _mutex.unlock(); + // In case of a statically allocated thread pool (such + // as the default thread pool), Windows may have already + // terminated the thread before we got here. + if (_thread.isRunning()) + _targetReady.set(); + + if (_thread.tryJoin(JOIN_TIMEOUT)) + { + delete this; + } +} + + +void PooledThread::run() +{ + _started.set(); + for (;;) + { + _targetReady.wait(); + _mutex.lock(); + if (_pTarget) // a NULL target means kill yourself + { + Runnable* pTarget = _pTarget; + _mutex.unlock(); + try + { + pTarget->run(); + } + catch (Exception& exc) + { + ErrorHandler::handle(exc); + } + catch (std::exception& exc) + { + ErrorHandler::handle(exc); + } + catch (...) + { + ErrorHandler::handle(); + } + FastMutex::ScopedLock lock(_mutex); + _pTarget = 0; +#if defined(_WIN32_WCE) && _WIN32_WCE < 0x800 + _idleTime = wceex_time(NULL); +#else + _idleTime = time(NULL); +#endif + _idle = true; + _targetCompleted.set(); + ThreadLocalStorage::clear(); + _thread.setName(_name); + _thread.setPriority(Thread::PRIO_NORMAL); + } + else + { + _mutex.unlock(); + break; + } + } +} + + +ThreadPool::ThreadPool(int minCapacity, + int maxCapacity, + int idleTime, + int stackSize): + _minCapacity(minCapacity), + _maxCapacity(maxCapacity), + _idleTime(idleTime), + _serial(0), + _age(0), + _stackSize(stackSize) +{ + poco_assert (minCapacity >= 1 && maxCapacity >= minCapacity && idleTime > 0); + + for (int i = 0; i < _minCapacity; i++) + { + PooledThread* pThread = createThread(); + _threads.push_back(pThread); + pThread->start(); + } +} + + +ThreadPool::ThreadPool(const std::string& rName, + int minCapacity, + int maxCapacity, + int idleTime, int stackSize): - _minCapacity(minCapacity), - _maxCapacity(maxCapacity), - _idleTime(idleTime), - _serial(0), - _age(0), - _stackSize(stackSize) -{ - poco_assert (minCapacity >= 1 && maxCapacity >= minCapacity && idleTime > 0); - - for (int i = 0; i < _minCapacity; i++) - { - PooledThread* pThread = createThread(); - _threads.push_back(pThread); - pThread->start(); - } -} - - -ThreadPool::ThreadPool(const std::string& rName, - int minCapacity, - int maxCapacity, - int idleTime, - int stackSize): - _name(rName), - _minCapacity(minCapacity), - _maxCapacity(maxCapacity), - _idleTime(idleTime), - _serial(0), - _age(0), - _stackSize(stackSize) -{ - poco_assert (minCapacity >= 1 && maxCapacity >= minCapacity && idleTime > 0); - - for (int i = 0; i < _minCapacity; i++) - { - PooledThread* pThread = createThread(); - _threads.push_back(pThread); - pThread->start(); - } -} - - -ThreadPool::~ThreadPool() -{ - try - { - stopAll(); - } - catch (...) - { - poco_unexpected(); - } -} - - -void ThreadPool::addCapacity(int n) -{ - FastMutex::ScopedLock lock(_mutex); - - poco_assert (_maxCapacity + n >= _minCapacity); - _maxCapacity += n; - housekeep(); -} - - -int ThreadPool::capacity() const -{ - FastMutex::ScopedLock lock(_mutex); - return _maxCapacity; -} - - -int ThreadPool::available() const -{ - FastMutex::ScopedLock lock(_mutex); - - int count = 0; - for (ThreadVec::const_iterator it = _threads.begin(); it != _threads.end(); ++it) - { - if ((*it)->idle()) ++count; - } - return (int) (count + _maxCapacity - _threads.size()); -} - - -int ThreadPool::used() const -{ - FastMutex::ScopedLock lock(_mutex); - - int count = 0; - for (ThreadVec::const_iterator it = _threads.begin(); it != _threads.end(); ++it) - { - if (!(*it)->idle()) ++count; - } - return count; -} - - -int ThreadPool::allocated() const -{ - FastMutex::ScopedLock lock(_mutex); - - return int(_threads.size()); -} - - -void ThreadPool::start(Runnable& target) -{ - getThread()->start(Thread::PRIO_NORMAL, target); -} - - -void ThreadPool::start(Runnable& target, const std::string& rName) -{ - getThread()->start(Thread::PRIO_NORMAL, target, rName); -} - - -void ThreadPool::startWithPriority(Thread::Priority priority, Runnable& target) -{ - getThread()->start(priority, target); -} - - -void ThreadPool::startWithPriority(Thread::Priority priority, Runnable& target, const std::string& rName) -{ - getThread()->start(priority, target, rName); -} - - -void ThreadPool::stopAll() -{ - FastMutex::ScopedLock lock(_mutex); - - for (ThreadVec::iterator it = _threads.begin(); it != _threads.end(); ++it) - { - (*it)->release(); - } - _threads.clear(); -} - - -void ThreadPool::joinAll() -{ - FastMutex::ScopedLock lock(_mutex); - - for (ThreadVec::iterator it = _threads.begin(); it != _threads.end(); ++it) - { - (*it)->join(); - } - housekeep(); -} - - -void ThreadPool::collect() -{ - FastMutex::ScopedLock lock(_mutex); - housekeep(); -} - - -void ThreadPool::housekeep() -{ - _age = 0; - if (_threads.size() <= _minCapacity) - return; - - ThreadVec idleThreads; - ThreadVec expiredThreads; - ThreadVec activeThreads; - idleThreads.reserve(_threads.size()); - activeThreads.reserve(_threads.size()); - - for (ThreadVec::iterator it = _threads.begin(); it != _threads.end(); ++it) - { - if ((*it)->idle()) - { - if ((*it)->idleTime() < _idleTime) - idleThreads.push_back(*it); - else - expiredThreads.push_back(*it); - } - else activeThreads.push_back(*it); - } - int n = (int) activeThreads.size(); - int limit = (int) idleThreads.size() + n; - if (limit < _minCapacity) limit = _minCapacity; - idleThreads.insert(idleThreads.end(), expiredThreads.begin(), expiredThreads.end()); - _threads.clear(); - for (ThreadVec::iterator it = idleThreads.begin(); it != idleThreads.end(); ++it) - { - if (n < limit) - { - _threads.push_back(*it); - ++n; - } - else (*it)->release(); - } - _threads.insert(_threads.end(), activeThreads.begin(), activeThreads.end()); -} - - -PooledThread* ThreadPool::getThread() -{ - FastMutex::ScopedLock lock(_mutex); - - if (++_age == 32) - housekeep(); - - PooledThread* pThread = 0; - for (ThreadVec::iterator it = _threads.begin(); !pThread && it != _threads.end(); ++it) - { - if ((*it)->idle()) - pThread = *it; - } - if (!pThread) - { - if (_threads.size() < _maxCapacity) - { - pThread = createThread(); - try - { - pThread->start(); - _threads.push_back(pThread); - } catch (...) - { - delete pThread; - throw; - } - } - else - throw NoThreadAvailableException(); - } - pThread->activate(); - return pThread; -} - - -PooledThread* ThreadPool::createThread() -{ - std::ostringstream threadName; - threadName << _name << "[#" << ++_serial << "]"; - return new PooledThread(threadName.str(), _stackSize); -} - - -class ThreadPoolSingletonHolder -{ -public: - ThreadPoolSingletonHolder() - { - _pPool = 0; - } - ~ThreadPoolSingletonHolder() - { - delete _pPool; - } - ThreadPool* pool() - { - FastMutex::ScopedLock lock(_mutex); - - if (!_pPool) - { - _pPool = new ThreadPool("default"); - if (POCO_THREAD_STACK_SIZE > 0) - _pPool->setStackSize(POCO_THREAD_STACK_SIZE); - } - return _pPool; - } - -private: - ThreadPool* _pPool; - FastMutex _mutex; -}; - - -namespace -{ - static ThreadPoolSingletonHolder sh; -} - - -ThreadPool& ThreadPool::defaultPool() -{ - return *sh.pool(); -} - - -} // namespace Poco + _name(rName), + _minCapacity(minCapacity), + _maxCapacity(maxCapacity), + _idleTime(idleTime), + _serial(0), + _age(0), + _stackSize(stackSize) +{ + poco_assert (minCapacity >= 1 && maxCapacity >= minCapacity && idleTime > 0); + + for (int i = 0; i < _minCapacity; i++) + { + PooledThread* pThread = createThread(); + _threads.push_back(pThread); + pThread->start(); + } +} + + +ThreadPool::~ThreadPool() +{ + try + { + stopAll(); + } + catch (...) + { + poco_unexpected(); + } +} + + +void ThreadPool::addCapacity(int n) +{ + FastMutex::ScopedLock lock(_mutex); + + poco_assert (_maxCapacity + n >= _minCapacity); + _maxCapacity += n; + housekeep(); +} + + +int ThreadPool::capacity() const +{ + FastMutex::ScopedLock lock(_mutex); + return _maxCapacity; +} + + +int ThreadPool::available() const +{ + FastMutex::ScopedLock lock(_mutex); + + int count = 0; + for (ThreadVec::const_iterator it = _threads.begin(); it != _threads.end(); ++it) + { + if ((*it)->idle()) ++count; + } + return (int) (count + _maxCapacity - _threads.size()); +} + + +int ThreadPool::used() const +{ + FastMutex::ScopedLock lock(_mutex); + + int count = 0; + for (ThreadVec::const_iterator it = _threads.begin(); it != _threads.end(); ++it) + { + if (!(*it)->idle()) ++count; + } + return count; +} + + +int ThreadPool::allocated() const +{ + FastMutex::ScopedLock lock(_mutex); + + return int(_threads.size()); +} + + +void ThreadPool::start(Runnable& target) +{ + getThread()->start(Thread::PRIO_NORMAL, target); +} + + +void ThreadPool::start(Runnable& target, const std::string& rName) +{ + getThread()->start(Thread::PRIO_NORMAL, target, rName); +} + + +void ThreadPool::startWithPriority(Thread::Priority priority, Runnable& target) +{ + getThread()->start(priority, target); +} + + +void ThreadPool::startWithPriority(Thread::Priority priority, Runnable& target, const std::string& rName) +{ + getThread()->start(priority, target, rName); +} + + +void ThreadPool::stopAll() +{ + FastMutex::ScopedLock lock(_mutex); + + for (ThreadVec::iterator it = _threads.begin(); it != _threads.end(); ++it) + { + (*it)->release(); + } + _threads.clear(); +} + + +void ThreadPool::joinAll() +{ + FastMutex::ScopedLock lock(_mutex); + + for (ThreadVec::iterator it = _threads.begin(); it != _threads.end(); ++it) + { + (*it)->join(); + } + housekeep(); +} + + +void ThreadPool::collect() +{ + FastMutex::ScopedLock lock(_mutex); + housekeep(); +} + + +void ThreadPool::housekeep() +{ + _age = 0; + if (_threads.size() <= _minCapacity) + return; + + ThreadVec idleThreads; + ThreadVec expiredThreads; + ThreadVec activeThreads; + idleThreads.reserve(_threads.size()); + activeThreads.reserve(_threads.size()); + + for (ThreadVec::iterator it = _threads.begin(); it != _threads.end(); ++it) + { + if ((*it)->idle()) + { + if ((*it)->idleTime() < _idleTime) + idleThreads.push_back(*it); + else + expiredThreads.push_back(*it); + } + else activeThreads.push_back(*it); + } + int n = (int) activeThreads.size(); + int limit = (int) idleThreads.size() + n; + if (limit < _minCapacity) limit = _minCapacity; + idleThreads.insert(idleThreads.end(), expiredThreads.begin(), expiredThreads.end()); + _threads.clear(); + for (ThreadVec::iterator it = idleThreads.begin(); it != idleThreads.end(); ++it) + { + if (n < limit) + { + _threads.push_back(*it); + ++n; + } + else (*it)->release(); + } + _threads.insert(_threads.end(), activeThreads.begin(), activeThreads.end()); +} + + +PooledThread* ThreadPool::getThread() +{ + FastMutex::ScopedLock lock(_mutex); + + if (++_age == 32) + housekeep(); + + PooledThread* pThread = 0; + for (ThreadVec::iterator it = _threads.begin(); !pThread && it != _threads.end(); ++it) + { + if ((*it)->idle()) + pThread = *it; + } + if (!pThread) + { + if (_threads.size() < _maxCapacity) + { + pThread = createThread(); + try + { + pThread->start(); + _threads.push_back(pThread); + } catch (...) + { + delete pThread; + throw; + } + } + else + throw NoThreadAvailableException(); + } + pThread->activate(); + return pThread; +} + + +PooledThread* ThreadPool::createThread() +{ + std::ostringstream threadName; + threadName << _name << "[#" << ++_serial << "]"; + return new PooledThread(threadName.str(), _stackSize); +} + + +class ThreadPoolSingletonHolder +{ +public: + ThreadPoolSingletonHolder() + { + _pPool = 0; + } + ~ThreadPoolSingletonHolder() + { + delete _pPool; + } + ThreadPool* pool() + { + FastMutex::ScopedLock lock(_mutex); + + if (!_pPool) + { + _pPool = new ThreadPool("default"); + if (POCO_THREAD_STACK_SIZE > 0) + _pPool->setStackSize(POCO_THREAD_STACK_SIZE); + } + return _pPool; + } + +private: + ThreadPool* _pPool; + FastMutex _mutex; +}; + + +namespace +{ + static ThreadPoolSingletonHolder sh; +} + + +ThreadPool& ThreadPool::defaultPool() +{ + return *sh.pool(); +} + + +} // namespace Poco |