diff options
author | Devtools Arcadia <arcadia-devtools@yandex-team.ru> | 2022-02-07 18:08:42 +0300 |
---|---|---|
committer | Devtools Arcadia <arcadia-devtools@mous.vla.yp-c.yandex.net> | 2022-02-07 18:08:42 +0300 |
commit | 1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch) | |
tree | e26c9fed0de5d9873cce7e00bc214573dc2195b7 /library/cpp/threading/local_executor/local_executor.h | |
download | ydb-1110808a9d39d4b808aef724c861a2e1a38d2a69.tar.gz |
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'library/cpp/threading/local_executor/local_executor.h')
-rw-r--r-- | library/cpp/threading/local_executor/local_executor.h | 294 |
1 files changed, 294 insertions, 0 deletions
diff --git a/library/cpp/threading/local_executor/local_executor.h b/library/cpp/threading/local_executor/local_executor.h new file mode 100644 index 0000000000..c1c824f67c --- /dev/null +++ b/library/cpp/threading/local_executor/local_executor.h @@ -0,0 +1,294 @@ +#pragma once + +#include <library/cpp/threading/future/future.h> + +#include <util/generic/cast.h> +#include <util/generic/fwd.h> +#include <util/generic/noncopyable.h> +#include <util/generic/ptr.h> +#include <util/generic/singleton.h> +#include <util/generic/ymath.h> + +#include <functional> + +namespace NPar { + struct ILocallyExecutable : virtual public TThrRefBase { + // Must be implemented by the end user to define job that will be processed by one of + // executor threads. + // + // @param id Job parameter, typically an index pointing somewhere in array, or just + // some dummy value, e.g. `0`. + virtual void LocalExec(int id) = 0; + }; + + // Alternative and simpler way of describing a job for executor. Function argument has the + // same meaning as `id` in `ILocallyExecutable::LocalExec`. + // + using TLocallyExecutableFunction = std::function<void(int)>; + + class ILocalExecutor: public TNonCopyable { + public: + ILocalExecutor() = default; + virtual ~ILocalExecutor() = default; + + enum EFlags : int { + HIGH_PRIORITY = 0, + MED_PRIORITY = 1, + LOW_PRIORITY = 2, + PRIORITY_MASK = 3, + WAIT_COMPLETE = 4 + }; + + // Add task for further execution. + // + // @param exec Task description. + // @param id Task argument. + // @param flags Bitmask composed by `HIGH_PRIORITY`, `MED_PRIORITY`, `LOW_PRIORITY` + // and `WAIT_COMPLETE`. + virtual void Exec(TIntrusivePtr<ILocallyExecutable> exec, int id, int flags) = 0; + + // Add tasks range for further execution. + // + // @param exec Task description. + // @param firstId, lastId Task arguments [firstId, lastId) + // @param flags Same as for `Exec`. + virtual void ExecRange(TIntrusivePtr<ILocallyExecutable> exec, int firstId, int lastId, int flags) = 0; + + // 0-based ILocalExecutor worker thread identification + virtual int GetWorkerThreadId() const noexcept = 0; + virtual int GetThreadCount() const noexcept = 0; + + // Describes a range of tasks with parameters from integer range [FirstId, LastId). + // + class TExecRangeParams { + public: + template <typename TFirst, typename TLast> + TExecRangeParams(TFirst firstId, TLast lastId) + : FirstId(SafeIntegerCast<int>(firstId)) + , LastId(SafeIntegerCast<int>(lastId)) + { + Y_ASSERT(LastId >= FirstId); + SetBlockSize(1); + } + // Partition tasks into `blockCount` blocks of approximately equal size, each of which + // will be executed as a separate bigger task. + // + template <typename TBlockCount> + TExecRangeParams& SetBlockCount(TBlockCount blockCount) { + Y_ASSERT(SafeIntegerCast<int>(blockCount) > 0 || FirstId == LastId); + BlockSize = FirstId == LastId ? 0 : CeilDiv(LastId - FirstId, SafeIntegerCast<int>(blockCount)); + BlockCount = BlockSize == 0 ? 0 : CeilDiv(LastId - FirstId, BlockSize); + BlockEqualToThreads = false; + return *this; + } + // Partition tasks into blocks of approximately `blockSize` size, each of which will + // be executed as a separate bigger task. + // + template <typename TBlockSize> + TExecRangeParams& SetBlockSize(TBlockSize blockSize) { + Y_ASSERT(SafeIntegerCast<int>(blockSize) > 0 || FirstId == LastId); + BlockSize = SafeIntegerCast<int>(blockSize); + BlockCount = BlockSize == 0 ? 0 : CeilDiv(LastId - FirstId, BlockSize); + BlockEqualToThreads = false; + return *this; + } + // Partition tasks into thread count blocks of approximately equal size, each of which + // will be executed as a separate bigger task. + // + TExecRangeParams& SetBlockCountToThreadCount() { + BlockEqualToThreads = true; + return *this; + } + int GetBlockCount() const { + Y_ASSERT(!BlockEqualToThreads); + return BlockCount; + } + int GetBlockSize() const { + Y_ASSERT(!BlockEqualToThreads); + return BlockSize; + } + bool GetBlockEqualToThreads() { + return BlockEqualToThreads; + } + + const int FirstId = 0; + const int LastId = 0; + + private: + int BlockSize; + int BlockCount; + bool BlockEqualToThreads; + }; + + // `Exec` and `ExecRange` versions that accept functions. + // + void Exec(TLocallyExecutableFunction exec, int id, int flags); + void ExecRange(TLocallyExecutableFunction exec, int firstId, int lastId, int flags); + + // Version of `ExecRange` that throws exception from task with minimal id if at least one of + // task threw an exception. + // + void ExecRangeWithThrow(TLocallyExecutableFunction exec, int firstId, int lastId, int flags); + + // Version of `ExecRange` that returns vector of futures, thus allowing to retry any task if + // it fails. + // + TVector<NThreading::TFuture<void>> ExecRangeWithFutures(TLocallyExecutableFunction exec, int firstId, int lastId, int flags); + + template <typename TBody> + static inline auto BlockedLoopBody(const TExecRangeParams& params, const TBody& body) { + return [=](int blockId) { + const int blockFirstId = params.FirstId + blockId * params.GetBlockSize(); + const int blockLastId = Min(params.LastId, blockFirstId + params.GetBlockSize()); + for (int i = blockFirstId; i < blockLastId; ++i) { + body(i); + } + }; + } + + template <typename TBody> + inline void ExecRange(TBody&& body, TExecRangeParams params, int flags) { + if (TryExecRangeSequentially(body, params.FirstId, params.LastId, flags)) { + return; + } + if (params.GetBlockEqualToThreads()) { + params.SetBlockCount(GetThreadCount() + ((flags & WAIT_COMPLETE) != 0)); // ThreadCount or ThreadCount+1 depending on WaitFlag + } + ExecRange(BlockedLoopBody(params, body), 0, params.GetBlockCount(), flags); + } + + template <typename TBody> + inline void ExecRangeBlockedWithThrow(TBody&& body, int firstId, int lastId, int batchSizeOrZeroForAutoBatchSize, int flags) { + if (firstId >= lastId) { + return; + } + const int threadCount = Max(GetThreadCount(), 1); + const int batchSize = batchSizeOrZeroForAutoBatchSize + ? batchSizeOrZeroForAutoBatchSize + : (lastId - firstId + threadCount - 1) / threadCount; + const int batchCount = (lastId - firstId + batchSize - 1) / batchSize; + const int batchCountPerThread = (batchCount + threadCount - 1) / threadCount; + auto states = ExecRangeWithFutures( + [=](int threadId) { + for (int batchIdPerThread = 0; batchIdPerThread < batchCountPerThread; ++batchIdPerThread) { + int batchId = batchIdPerThread * threadCount + threadId; + int begin = firstId + batchId * batchSize; + int end = Min(begin + batchSize, lastId); + for (int i = begin; i < end; ++i) { + body(i); + } + } + }, + 0, threadCount, flags); + for (auto& state: states) { + state.GetValueSync(); // Re-throw exception if any. + } + } + + template <typename TBody> + static inline bool TryExecRangeSequentially(TBody&& body, int firstId, int lastId, int flags) { + if (lastId == firstId) { + return true; + } + if ((flags & WAIT_COMPLETE) && lastId - firstId == 1) { + body(firstId); + return true; + } + return false; + } + }; + + // `TLocalExecutor` provides facilities for easy parallelization of existing code and cycles. + // + // Examples: + // Execute one task with medium priority and wait for it completion. + // ``` + // LocalExecutor().Run(4); + // TEvent event; + // LocalExecutor().Exec([](int) { + // SomeFunc(); + // event.Signal(); + // }, 0, TLocalExecutor::MED_PRIORITY); + // + // SomeOtherCode(); + // event.WaitI(); + // ``` + // + // Execute range of tasks with medium priority. + // ``` + // LocalExecutor().Run(4); + // LocalExecutor().ExecRange([](int id) { + // SomeFunc(id); + // }, TExecRangeParams(0, 10), TLocalExecutor::WAIT_COMPLETE | TLocalExecutor::MED_PRIORITY); + // ``` + // + class TLocalExecutor final: public ILocalExecutor { + public: + using EFlags = ILocalExecutor::EFlags; + + // Creates executor without threads. You'll need to explicitly call `RunAdditionalThreads` + // to add threads to underlying thread pool. + // + TLocalExecutor(); + ~TLocalExecutor(); + + int GetQueueSize() const noexcept; + int GetMPQueueSize() const noexcept; + int GetLPQueueSize() const noexcept; + void ClearLPQueue(); + + // 0-based TLocalExecutor worker thread identification + int GetWorkerThreadId() const noexcept override; + int GetThreadCount() const noexcept override; + + // **Add** threads to underlying thread pool. + // + // @param threadCount Number of threads to add. + void RunAdditionalThreads(int threadCount); + + // Add task for further execution. + // + // @param exec Task description. + // @param id Task argument. + // @param flags Bitmask composed by `HIGH_PRIORITY`, `MED_PRIORITY`, `LOW_PRIORITY` + // and `WAIT_COMPLETE`. + void Exec(TIntrusivePtr<ILocallyExecutable> exec, int id, int flags) override; + + // Add tasks range for further execution. + // + // @param exec Task description. + // @param firstId, lastId Task arguments [firstId, lastId) + // @param flags Same as for `Exec`. + void ExecRange(TIntrusivePtr<ILocallyExecutable> exec, int firstId, int lastId, int flags) override; + + using ILocalExecutor::Exec; + using ILocalExecutor::ExecRange; + + private: + class TImpl; + THolder<TImpl> Impl_; + }; + + static inline TLocalExecutor& LocalExecutor() { + return *Singleton<TLocalExecutor>(); + } + + template <typename TBody> + inline void ParallelFor(ILocalExecutor& executor, ui32 from, ui32 to, TBody&& body) { + ILocalExecutor::TExecRangeParams params(from, to); + params.SetBlockCountToThreadCount(); + executor.ExecRange(std::forward<TBody>(body), params, TLocalExecutor::WAIT_COMPLETE); + } + + template <typename TBody> + inline void ParallelFor(ui32 from, ui32 to, TBody&& body) { + ParallelFor(LocalExecutor(), from, to, std::forward<TBody>(body)); + } + + template <typename TBody> + inline void AsyncParallelFor(ui32 from, ui32 to, TBody&& body) { + ILocalExecutor::TExecRangeParams params(from, to); + params.SetBlockCountToThreadCount(); + LocalExecutor().ExecRange(std::forward<TBody>(body), params, 0); + } +} |