diff options
author | Anton Samokhvalov <pg83@yandex.ru> | 2022-02-10 16:45:17 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:45:17 +0300 |
commit | d3a398281c6fd1d3672036cb2d63f842d2cb28c5 (patch) | |
tree | dd4bd3ca0f36b817e96812825ffaf10d645803f2 /library/cpp/threading/poor_man_openmp/thread_helper.h | |
parent | 72cb13b4aff9bc9cf22e49251bc8fd143f82538f (diff) | |
download | ydb-d3a398281c6fd1d3672036cb2d63f842d2cb28c5.tar.gz |
Restoring authorship annotation for Anton Samokhvalov <pg83@yandex.ru>. Commit 2 of 2.
Diffstat (limited to 'library/cpp/threading/poor_man_openmp/thread_helper.h')
-rw-r--r-- | library/cpp/threading/poor_man_openmp/thread_helper.h | 108 |
1 files changed, 54 insertions, 54 deletions
diff --git a/library/cpp/threading/poor_man_openmp/thread_helper.h b/library/cpp/threading/poor_man_openmp/thread_helper.h index 1536c186cb..0ecee0590b 100644 --- a/library/cpp/threading/poor_man_openmp/thread_helper.h +++ b/library/cpp/threading/poor_man_openmp/thread_helper.h @@ -2,17 +2,17 @@ #include <util/thread/pool.h> #include <util/generic/utility.h> -#include <util/generic/yexception.h> +#include <util/generic/yexception.h> #include <util/system/info.h> #include <util/system/atomic.h> #include <util/system/condvar.h> #include <util/system/mutex.h> -#include <util/stream/output.h> +#include <util/stream/output.h> #include <functional> -#include <cstdlib> +#include <cstdlib> -class TMtpQueueHelper { +class TMtpQueueHelper { public: TMtpQueueHelper() { SetThreadCount(NSystemInfo::CachedNumberOfCpus()); @@ -27,79 +27,79 @@ public: ThreadCount = threads; q = CreateThreadPool(ThreadCount); } - - static TMtpQueueHelper& Instance(); - + + static TMtpQueueHelper& Instance(); + private: size_t ThreadCount; TAutoPtr<IThreadPool> q; }; -namespace NYmp { +namespace NYmp { inline void SetThreadCount(size_t threads) { - TMtpQueueHelper::Instance().SetThreadCount(threads); + TMtpQueueHelper::Instance().SetThreadCount(threads); } inline size_t GetThreadCount() { - return TMtpQueueHelper::Instance().GetThreadCount(); + return TMtpQueueHelper::Instance().GetThreadCount(); } - template <typename T> + template <typename T> inline void ParallelForStaticChunk(T begin, T end, size_t chunkSize, std::function<void(T)> func) { - chunkSize = Max<size_t>(chunkSize, 1); - - size_t threadCount = TMtpQueueHelper::Instance().GetThreadCount(); + chunkSize = Max<size_t>(chunkSize, 1); + + size_t threadCount = TMtpQueueHelper::Instance().GetThreadCount(); IThreadPool* queue = TMtpQueueHelper::Instance().Get(); TCondVar cv; TMutex mutex; TAtomic counter = threadCount; - std::exception_ptr err; - - for (size_t i = 0; i < threadCount; ++i) { - queue->SafeAddFunc([&cv, &counter, &mutex, &func, i, begin, end, chunkSize, threadCount, &err]() { - try { - T currentChunkStart = begin + static_cast<decltype(T() - T())>(i * chunkSize); - - while (currentChunkStart < end) { - T currentChunkEnd = Min<T>(end, currentChunkStart + chunkSize); - - for (T val = currentChunkStart; val < currentChunkEnd; ++val) { - func(val); - } - - currentChunkStart += chunkSize * threadCount; + std::exception_ptr err; + + for (size_t i = 0; i < threadCount; ++i) { + queue->SafeAddFunc([&cv, &counter, &mutex, &func, i, begin, end, chunkSize, threadCount, &err]() { + try { + T currentChunkStart = begin + static_cast<decltype(T() - T())>(i * chunkSize); + + while (currentChunkStart < end) { + T currentChunkEnd = Min<T>(end, currentChunkStart + chunkSize); + + for (T val = currentChunkStart; val < currentChunkEnd; ++val) { + func(val); + } + + currentChunkStart += chunkSize * threadCount; + } + } catch (...) { + with_lock (mutex) { + err = std::current_exception(); + } + } + + with_lock (mutex) { + if (AtomicDecrement(counter) == 0) { + //last one + cv.Signal(); } - } catch (...) { - with_lock (mutex) { - err = std::current_exception(); - } } - - with_lock (mutex) { - if (AtomicDecrement(counter) == 0) { - //last one - cv.Signal(); - } - } }); } - - with_lock (mutex) { - while (AtomicGet(counter) > 0) { - cv.WaitI(mutex); - } + + with_lock (mutex) { + while (AtomicGet(counter) > 0) { + cv.WaitI(mutex); + } + } + + if (err) { + std::rethrow_exception(err); } - - if (err) { - std::rethrow_exception(err); - } } - template <typename T> + template <typename T> inline void ParallelForStaticAutoChunk(T begin, T end, std::function<void(T)> func) { - const size_t taskSize = end - begin; - const size_t threadCount = TMtpQueueHelper::Instance().GetThreadCount(); - + const size_t taskSize = end - begin; + const size_t threadCount = TMtpQueueHelper::Instance().GetThreadCount(); + ParallelForStaticChunk(begin, end, (taskSize + threadCount - 1) / threadCount, func); } -} +} |