diff options
author | Evgueni Petrov <evgueni.s.petrov@gmail.com> | 2022-02-10 16:47:00 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:47:00 +0300 |
commit | 6bde7c5def28273dc3eb4b26959d640ce52e5d2f (patch) | |
tree | b83306b6e37edeea782e9eed673d89286c4fef35 /library/cpp/threading/local_executor/local_executor.h | |
parent | 19d7d7947f95423df4b50d3a6e858cd689db06ed (diff) | |
download | ydb-6bde7c5def28273dc3eb4b26959d640ce52e5d2f.tar.gz |
Restoring authorship annotation for Evgueni Petrov <evgueni.s.petrov@gmail.com>. Commit 2 of 2.
Diffstat (limited to 'library/cpp/threading/local_executor/local_executor.h')
-rw-r--r-- | library/cpp/threading/local_executor/local_executor.h | 302 |
1 files changed, 151 insertions, 151 deletions
diff --git a/library/cpp/threading/local_executor/local_executor.h b/library/cpp/threading/local_executor/local_executor.h index cbde5c62f4..c1c824f67c 100644 --- a/library/cpp/threading/local_executor/local_executor.h +++ b/library/cpp/threading/local_executor/local_executor.h @@ -2,7 +2,7 @@ #include <library/cpp/threading/future/future.h> -#include <util/generic/cast.h> +#include <util/generic/cast.h> #include <util/generic/fwd.h> #include <util/generic/noncopyable.h> #include <util/generic/ptr.h> @@ -26,11 +26,11 @@ namespace NPar { // using TLocallyExecutableFunction = std::function<void(int)>; - class ILocalExecutor: public TNonCopyable { + class ILocalExecutor: public TNonCopyable { public: - ILocalExecutor() = default; - virtual ~ILocalExecutor() = default; - + ILocalExecutor() = default; + virtual ~ILocalExecutor() = default; + enum EFlags : int { HIGH_PRIORITY = 0, MED_PRIORITY = 1, @@ -38,60 +38,60 @@ namespace NPar { PRIORITY_MASK = 3, WAIT_COMPLETE = 4 }; - - // Add task for further execution. - // - // @param exec Task description. - // @param id Task argument. - // @param flags Bitmask composed by `HIGH_PRIORITY`, `MED_PRIORITY`, `LOW_PRIORITY` - // and `WAIT_COMPLETE`. - virtual void Exec(TIntrusivePtr<ILocallyExecutable> exec, int id, int flags) = 0; - - // Add tasks range for further execution. - // - // @param exec Task description. - // @param firstId, lastId Task arguments [firstId, lastId) - // @param flags Same as for `Exec`. - virtual void ExecRange(TIntrusivePtr<ILocallyExecutable> exec, int firstId, int lastId, int flags) = 0; - - // 0-based ILocalExecutor worker thread identification - virtual int GetWorkerThreadId() const noexcept = 0; - virtual int GetThreadCount() const noexcept = 0; - + + // Add task for further execution. + // + // @param exec Task description. + // @param id Task argument. + // @param flags Bitmask composed by `HIGH_PRIORITY`, `MED_PRIORITY`, `LOW_PRIORITY` + // and `WAIT_COMPLETE`. + virtual void Exec(TIntrusivePtr<ILocallyExecutable> exec, int id, int flags) = 0; + + // Add tasks range for further execution. + // + // @param exec Task description. + // @param firstId, lastId Task arguments [firstId, lastId) + // @param flags Same as for `Exec`. + virtual void ExecRange(TIntrusivePtr<ILocallyExecutable> exec, int firstId, int lastId, int flags) = 0; + + // 0-based ILocalExecutor worker thread identification + virtual int GetWorkerThreadId() const noexcept = 0; + virtual int GetThreadCount() const noexcept = 0; + // Describes a range of tasks with parameters from integer range [FirstId, LastId). // class TExecRangeParams { - public: - template <typename TFirst, typename TLast> - TExecRangeParams(TFirst firstId, TLast lastId) - : FirstId(SafeIntegerCast<int>(firstId)) - , LastId(SafeIntegerCast<int>(lastId)) - { - Y_ASSERT(LastId >= FirstId); + public: + template <typename TFirst, typename TLast> + TExecRangeParams(TFirst firstId, TLast lastId) + : FirstId(SafeIntegerCast<int>(firstId)) + , LastId(SafeIntegerCast<int>(lastId)) + { + Y_ASSERT(LastId >= FirstId); SetBlockSize(1); - } + } // Partition tasks into `blockCount` blocks of approximately equal size, each of which // will be executed as a separate bigger task. // - template <typename TBlockCount> - TExecRangeParams& SetBlockCount(TBlockCount blockCount) { + template <typename TBlockCount> + TExecRangeParams& SetBlockCount(TBlockCount blockCount) { Y_ASSERT(SafeIntegerCast<int>(blockCount) > 0 || FirstId == LastId); BlockSize = FirstId == LastId ? 0 : CeilDiv(LastId - FirstId, SafeIntegerCast<int>(blockCount)); BlockCount = BlockSize == 0 ? 0 : CeilDiv(LastId - FirstId, BlockSize); BlockEqualToThreads = false; - return *this; - } + return *this; + } // Partition tasks into blocks of approximately `blockSize` size, each of which will // be executed as a separate bigger task. // - template <typename TBlockSize> - TExecRangeParams& SetBlockSize(TBlockSize blockSize) { + template <typename TBlockSize> + TExecRangeParams& SetBlockSize(TBlockSize blockSize) { Y_ASSERT(SafeIntegerCast<int>(blockSize) > 0 || FirstId == LastId); - BlockSize = SafeIntegerCast<int>(blockSize); + BlockSize = SafeIntegerCast<int>(blockSize); BlockCount = BlockSize == 0 ? 0 : CeilDiv(LastId - FirstId, BlockSize); BlockEqualToThreads = false; - return *this; - } + return *this; + } // Partition tasks into thread count blocks of approximately equal size, each of which // will be executed as a separate bigger task. // @@ -99,27 +99,27 @@ namespace NPar { BlockEqualToThreads = true; return *this; } - int GetBlockCount() const { + int GetBlockCount() const { Y_ASSERT(!BlockEqualToThreads); - return BlockCount; - } - int GetBlockSize() const { + return BlockCount; + } + int GetBlockSize() const { Y_ASSERT(!BlockEqualToThreads); - return BlockSize; - } + return BlockSize; + } bool GetBlockEqualToThreads() { return BlockEqualToThreads; } - - const int FirstId = 0; - const int LastId = 0; - - private: + + const int FirstId = 0; + const int LastId = 0; + + private: int BlockSize; int BlockCount; bool BlockEqualToThreads; - }; - + }; + // `Exec` and `ExecRange` versions that accept functions. // void Exec(TLocallyExecutableFunction exec, int id, int flags); @@ -136,26 +136,26 @@ namespace NPar { TVector<NThreading::TFuture<void>> ExecRangeWithFutures(TLocallyExecutableFunction exec, int firstId, int lastId, int flags); template <typename TBody> - static inline auto BlockedLoopBody(const TExecRangeParams& params, const TBody& body) { + static inline auto BlockedLoopBody(const TExecRangeParams& params, const TBody& body) { return [=](int blockId) { const int blockFirstId = params.FirstId + blockId * params.GetBlockSize(); const int blockLastId = Min(params.LastId, blockFirstId + params.GetBlockSize()); - for (int i = blockFirstId; i < blockLastId; ++i) { - body(i); - } - }; - } - - template <typename TBody> + for (int i = blockFirstId; i < blockLastId; ++i) { + body(i); + } + }; + } + + template <typename TBody> inline void ExecRange(TBody&& body, TExecRangeParams params, int flags) { - if (TryExecRangeSequentially(body, params.FirstId, params.LastId, flags)) { + if (TryExecRangeSequentially(body, params.FirstId, params.LastId, flags)) { return; } if (params.GetBlockEqualToThreads()) { params.SetBlockCount(GetThreadCount() + ((flags & WAIT_COMPLETE) != 0)); // ThreadCount or ThreadCount+1 depending on WaitFlag - } + } ExecRange(BlockedLoopBody(params, body), 0, params.GetBlockCount(), flags); - } + } template <typename TBody> inline void ExecRangeBlockedWithThrow(TBody&& body, int firstId, int lastId, int batchSizeOrZeroForAutoBatchSize, int flags) { @@ -185,97 +185,97 @@ namespace NPar { } } - template <typename TBody> - static inline bool TryExecRangeSequentially(TBody&& body, int firstId, int lastId, int flags) { - if (lastId == firstId) { - return true; - } - if ((flags & WAIT_COMPLETE) && lastId - firstId == 1) { - body(firstId); - return true; - } - return false; - } + template <typename TBody> + static inline bool TryExecRangeSequentially(TBody&& body, int firstId, int lastId, int flags) { + if (lastId == firstId) { + return true; + } + if ((flags & WAIT_COMPLETE) && lastId - firstId == 1) { + body(firstId); + return true; + } + return false; + } + }; + + // `TLocalExecutor` provides facilities for easy parallelization of existing code and cycles. + // + // Examples: + // Execute one task with medium priority and wait for it completion. + // ``` + // LocalExecutor().Run(4); + // TEvent event; + // LocalExecutor().Exec([](int) { + // SomeFunc(); + // event.Signal(); + // }, 0, TLocalExecutor::MED_PRIORITY); + // + // SomeOtherCode(); + // event.WaitI(); + // ``` + // + // Execute range of tasks with medium priority. + // ``` + // LocalExecutor().Run(4); + // LocalExecutor().ExecRange([](int id) { + // SomeFunc(id); + // }, TExecRangeParams(0, 10), TLocalExecutor::WAIT_COMPLETE | TLocalExecutor::MED_PRIORITY); + // ``` + // + class TLocalExecutor final: public ILocalExecutor { + public: + using EFlags = ILocalExecutor::EFlags; + + // Creates executor without threads. You'll need to explicitly call `RunAdditionalThreads` + // to add threads to underlying thread pool. + // + TLocalExecutor(); + ~TLocalExecutor(); + + int GetQueueSize() const noexcept; + int GetMPQueueSize() const noexcept; + int GetLPQueueSize() const noexcept; + void ClearLPQueue(); + + // 0-based TLocalExecutor worker thread identification + int GetWorkerThreadId() const noexcept override; + int GetThreadCount() const noexcept override; + + // **Add** threads to underlying thread pool. + // + // @param threadCount Number of threads to add. + void RunAdditionalThreads(int threadCount); + + // Add task for further execution. + // + // @param exec Task description. + // @param id Task argument. + // @param flags Bitmask composed by `HIGH_PRIORITY`, `MED_PRIORITY`, `LOW_PRIORITY` + // and `WAIT_COMPLETE`. + void Exec(TIntrusivePtr<ILocallyExecutable> exec, int id, int flags) override; + + // Add tasks range for further execution. + // + // @param exec Task description. + // @param firstId, lastId Task arguments [firstId, lastId) + // @param flags Same as for `Exec`. + void ExecRange(TIntrusivePtr<ILocallyExecutable> exec, int firstId, int lastId, int flags) override; + + using ILocalExecutor::Exec; + using ILocalExecutor::ExecRange; + + private: + class TImpl; + THolder<TImpl> Impl_; }; - // `TLocalExecutor` provides facilities for easy parallelization of existing code and cycles. - // - // Examples: - // Execute one task with medium priority and wait for it completion. - // ``` - // LocalExecutor().Run(4); - // TEvent event; - // LocalExecutor().Exec([](int) { - // SomeFunc(); - // event.Signal(); - // }, 0, TLocalExecutor::MED_PRIORITY); - // - // SomeOtherCode(); - // event.WaitI(); - // ``` - // - // Execute range of tasks with medium priority. - // ``` - // LocalExecutor().Run(4); - // LocalExecutor().ExecRange([](int id) { - // SomeFunc(id); - // }, TExecRangeParams(0, 10), TLocalExecutor::WAIT_COMPLETE | TLocalExecutor::MED_PRIORITY); - // ``` - // - class TLocalExecutor final: public ILocalExecutor { - public: - using EFlags = ILocalExecutor::EFlags; - - // Creates executor without threads. You'll need to explicitly call `RunAdditionalThreads` - // to add threads to underlying thread pool. - // - TLocalExecutor(); - ~TLocalExecutor(); - - int GetQueueSize() const noexcept; - int GetMPQueueSize() const noexcept; - int GetLPQueueSize() const noexcept; - void ClearLPQueue(); - - // 0-based TLocalExecutor worker thread identification - int GetWorkerThreadId() const noexcept override; - int GetThreadCount() const noexcept override; - - // **Add** threads to underlying thread pool. - // - // @param threadCount Number of threads to add. - void RunAdditionalThreads(int threadCount); - - // Add task for further execution. - // - // @param exec Task description. - // @param id Task argument. - // @param flags Bitmask composed by `HIGH_PRIORITY`, `MED_PRIORITY`, `LOW_PRIORITY` - // and `WAIT_COMPLETE`. - void Exec(TIntrusivePtr<ILocallyExecutable> exec, int id, int flags) override; - - // Add tasks range for further execution. - // - // @param exec Task description. - // @param firstId, lastId Task arguments [firstId, lastId) - // @param flags Same as for `Exec`. - void ExecRange(TIntrusivePtr<ILocallyExecutable> exec, int firstId, int lastId, int flags) override; - - using ILocalExecutor::Exec; - using ILocalExecutor::ExecRange; - - private: - class TImpl; - THolder<TImpl> Impl_; - }; - static inline TLocalExecutor& LocalExecutor() { return *Singleton<TLocalExecutor>(); } template <typename TBody> - inline void ParallelFor(ILocalExecutor& executor, ui32 from, ui32 to, TBody&& body) { - ILocalExecutor::TExecRangeParams params(from, to); + inline void ParallelFor(ILocalExecutor& executor, ui32 from, ui32 to, TBody&& body) { + ILocalExecutor::TExecRangeParams params(from, to); params.SetBlockCountToThreadCount(); executor.ExecRange(std::forward<TBody>(body), params, TLocalExecutor::WAIT_COMPLETE); } @@ -287,7 +287,7 @@ namespace NPar { template <typename TBody> inline void AsyncParallelFor(ui32 from, ui32 to, TBody&& body) { - ILocalExecutor::TExecRangeParams params(from, to); + ILocalExecutor::TExecRangeParams params(from, to); params.SetBlockCountToThreadCount(); LocalExecutor().ExecRange(std::forward<TBody>(body), params, 0); } |