diff options
author | Anton Samokhvalov <pg83@yandex.ru> | 2022-02-10 16:45:15 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:45:15 +0300 |
commit | 72cb13b4aff9bc9cf22e49251bc8fd143f82538f (patch) | |
tree | da2c34829458c7d4e74bdfbdf85dff449e9e7fb8 /library/cpp/threading/poor_man_openmp | |
parent | 778e51ba091dc39e7b7fcab2b9cf4dbedfb6f2b5 (diff) | |
download | ydb-72cb13b4aff9bc9cf22e49251bc8fd143f82538f.tar.gz |
Restoring authorship annotation for Anton Samokhvalov <pg83@yandex.ru>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/threading/poor_man_openmp')
4 files changed, 88 insertions, 88 deletions
diff --git a/library/cpp/threading/poor_man_openmp/thread_helper.cpp b/library/cpp/threading/poor_man_openmp/thread_helper.cpp index 34cb6507b9..b4ec5c7879 100644 --- a/library/cpp/threading/poor_man_openmp/thread_helper.cpp +++ b/library/cpp/threading/poor_man_openmp/thread_helper.cpp @@ -1,7 +1,7 @@ #include "thread_helper.h" - -#include <util/generic/singleton.h> - -TMtpQueueHelper& TMtpQueueHelper::Instance() { - return *Singleton<TMtpQueueHelper>(); -} + +#include <util/generic/singleton.h> + +TMtpQueueHelper& TMtpQueueHelper::Instance() { + return *Singleton<TMtpQueueHelper>(); +} diff --git a/library/cpp/threading/poor_man_openmp/thread_helper.h b/library/cpp/threading/poor_man_openmp/thread_helper.h index 0ecee0590b..1536c186cb 100644 --- a/library/cpp/threading/poor_man_openmp/thread_helper.h +++ b/library/cpp/threading/poor_man_openmp/thread_helper.h @@ -2,17 +2,17 @@ #include <util/thread/pool.h> #include <util/generic/utility.h> -#include <util/generic/yexception.h> +#include <util/generic/yexception.h> #include <util/system/info.h> #include <util/system/atomic.h> #include <util/system/condvar.h> #include <util/system/mutex.h> -#include <util/stream/output.h> +#include <util/stream/output.h> #include <functional> -#include <cstdlib> +#include <cstdlib> -class TMtpQueueHelper { +class TMtpQueueHelper { public: TMtpQueueHelper() { SetThreadCount(NSystemInfo::CachedNumberOfCpus()); @@ -27,79 +27,79 @@ public: ThreadCount = threads; q = CreateThreadPool(ThreadCount); } - - static TMtpQueueHelper& Instance(); - + + static TMtpQueueHelper& Instance(); + private: size_t ThreadCount; TAutoPtr<IThreadPool> q; }; -namespace NYmp { +namespace NYmp { inline void SetThreadCount(size_t threads) { - TMtpQueueHelper::Instance().SetThreadCount(threads); + TMtpQueueHelper::Instance().SetThreadCount(threads); } inline size_t GetThreadCount() { - return TMtpQueueHelper::Instance().GetThreadCount(); + return TMtpQueueHelper::Instance().GetThreadCount(); } - template <typename T> + template <typename T> inline void ParallelForStaticChunk(T begin, T end, size_t chunkSize, std::function<void(T)> func) { - chunkSize = Max<size_t>(chunkSize, 1); - - size_t threadCount = TMtpQueueHelper::Instance().GetThreadCount(); + chunkSize = Max<size_t>(chunkSize, 1); + + size_t threadCount = TMtpQueueHelper::Instance().GetThreadCount(); IThreadPool* queue = TMtpQueueHelper::Instance().Get(); TCondVar cv; TMutex mutex; TAtomic counter = threadCount; - std::exception_ptr err; - - for (size_t i = 0; i < threadCount; ++i) { - queue->SafeAddFunc([&cv, &counter, &mutex, &func, i, begin, end, chunkSize, threadCount, &err]() { - try { - T currentChunkStart = begin + static_cast<decltype(T() - T())>(i * chunkSize); - - while (currentChunkStart < end) { - T currentChunkEnd = Min<T>(end, currentChunkStart + chunkSize); - - for (T val = currentChunkStart; val < currentChunkEnd; ++val) { - func(val); - } - - currentChunkStart += chunkSize * threadCount; - } - } catch (...) { - with_lock (mutex) { - err = std::current_exception(); - } - } - - with_lock (mutex) { - if (AtomicDecrement(counter) == 0) { - //last one - cv.Signal(); + std::exception_ptr err; + + for (size_t i = 0; i < threadCount; ++i) { + queue->SafeAddFunc([&cv, &counter, &mutex, &func, i, begin, end, chunkSize, threadCount, &err]() { + try { + T currentChunkStart = begin + static_cast<decltype(T() - T())>(i * chunkSize); + + while (currentChunkStart < end) { + T currentChunkEnd = Min<T>(end, currentChunkStart + chunkSize); + + for (T val = currentChunkStart; val < currentChunkEnd; ++val) { + func(val); + } + + currentChunkStart += chunkSize * threadCount; } + } catch (...) { + with_lock (mutex) { + err = std::current_exception(); + } } + + with_lock (mutex) { + if (AtomicDecrement(counter) == 0) { + //last one + cv.Signal(); + } + } }); } - - with_lock (mutex) { - while (AtomicGet(counter) > 0) { - cv.WaitI(mutex); - } - } - - if (err) { - std::rethrow_exception(err); + + with_lock (mutex) { + while (AtomicGet(counter) > 0) { + cv.WaitI(mutex); + } } + + if (err) { + std::rethrow_exception(err); + } } - template <typename T> + template <typename T> inline void ParallelForStaticAutoChunk(T begin, T end, std::function<void(T)> func) { - const size_t taskSize = end - begin; - const size_t threadCount = TMtpQueueHelper::Instance().GetThreadCount(); - + const size_t taskSize = end - begin; + const size_t threadCount = TMtpQueueHelper::Instance().GetThreadCount(); + ParallelForStaticChunk(begin, end, (taskSize + threadCount - 1) / threadCount, func); } -} +} diff --git a/library/cpp/threading/poor_man_openmp/thread_helper_ut.cpp b/library/cpp/threading/poor_man_openmp/thread_helper_ut.cpp index 7417636864..79c7a14b5e 100644 --- a/library/cpp/threading/poor_man_openmp/thread_helper_ut.cpp +++ b/library/cpp/threading/poor_man_openmp/thread_helper_ut.cpp @@ -1,26 +1,26 @@ -#include "thread_helper.h" - +#include "thread_helper.h" + #include <library/cpp/testing/unittest/registar.h> - + #include <util/generic/string.h> -#include <util/generic/yexception.h> - +#include <util/generic/yexception.h> + Y_UNIT_TEST_SUITE(TestMP) { Y_UNIT_TEST(TestErr) { - std::function<void(int)> f = [](int x) { - if (x == 5) { - ythrow yexception() << "oops"; - } - }; - + std::function<void(int)> f = [](int x) { + if (x == 5) { + ythrow yexception() << "oops"; + } + }; + TString s; - - try { - NYmp::ParallelForStaticAutoChunk(0, 10, f); - } catch (...) { - s = CurrentExceptionMessage(); - } - - UNIT_ASSERT(s.find("oops") > 0); - } -} + + try { + NYmp::ParallelForStaticAutoChunk(0, 10, f); + } catch (...) { + s = CurrentExceptionMessage(); + } + + UNIT_ASSERT(s.find("oops") > 0); + } +} diff --git a/library/cpp/threading/poor_man_openmp/ut/ya.make b/library/cpp/threading/poor_man_openmp/ut/ya.make index 6d7aa123ed..7305d14b99 100644 --- a/library/cpp/threading/poor_man_openmp/ut/ya.make +++ b/library/cpp/threading/poor_man_openmp/ut/ya.make @@ -1,12 +1,12 @@ UNITTEST_FOR(library/cpp/threading/poor_man_openmp) - + OWNER( pg agorodilov ) - -SRCS( - thread_helper_ut.cpp -) - -END() + +SRCS( + thread_helper_ut.cpp +) + +END() |