diff options
| author | Evgueni Petrov <[email protected]> | 2022-02-10 16:47:00 +0300 | 
|---|---|---|
| committer | Daniil Cherednik <[email protected]> | 2022-02-10 16:47:00 +0300 | 
| commit | 6bde7c5def28273dc3eb4b26959d640ce52e5d2f (patch) | |
| tree | b83306b6e37edeea782e9eed673d89286c4fef35 /library/cpp/threading/local_executor | |
| parent | 19d7d7947f95423df4b50d3a6e858cd689db06ed (diff) | |
Restoring authorship annotation for Evgueni Petrov <[email protected]>. Commit 2 of 2.
Diffstat (limited to 'library/cpp/threading/local_executor')
| -rw-r--r-- | library/cpp/threading/local_executor/local_executor.cpp | 36 | ||||
| -rw-r--r-- | library/cpp/threading/local_executor/local_executor.h | 302 | 
2 files changed, 169 insertions, 169 deletions
| diff --git a/library/cpp/threading/local_executor/local_executor.cpp b/library/cpp/threading/local_executor/local_executor.cpp index 17f4fc636bc..1d3fbb4bf44 100644 --- a/library/cpp/threading/local_executor/local_executor.cpp +++ b/library/cpp/threading/local_executor/local_executor.cpp @@ -84,8 +84,8 @@ namespace {      class TLocalRangeExecutor: public NPar::ILocallyExecutable {          TIntrusivePtr<NPar::ILocallyExecutable> Exec; -        alignas(64) TAtomic Counter;  -        alignas(64) TAtomic WorkerCount;  +        alignas(64) TAtomic Counter; +        alignas(64) TAtomic WorkerCount;          int LastId;          void LocalExec(int) override { @@ -106,7 +106,7 @@ namespace {          {          }          bool DoSingleOp() { -            const int id = AtomicAdd(Counter, 1) - 1;  +            const int id = AtomicAdd(Counter, 1) - 1;              if (id >= LastId)                  return false;              Exec->LocalExec(id); @@ -116,7 +116,7 @@ namespace {          void WaitComplete() {              while (AtomicGet(WorkerCount) > 0)                  RegularYield(); -        }  +        }          int GetRangeSize() const {              return Max<int>(LastId - Counter, 0);          } @@ -130,10 +130,10 @@ public:      TLockFreeQueue<TSingleJob> JobQueue;      TLockFreeQueue<TSingleJob> MedJobQueue;      TLockFreeQueue<TSingleJob> LowJobQueue; -    alignas(64) TSystemEvent HasJob;  +    alignas(64) TSystemEvent HasJob;      TAtomic ThreadCount{0}; -    alignas(64) TAtomic QueueSize{0};  +    alignas(64) TAtomic QueueSize{0};      TAtomic MPQueueSize{0};      TAtomic LPQueueSize{0};      TAtomic ThreadId{0}; @@ -231,7 +231,7 @@ void NPar::TLocalExecutor::TImpl::LaunchRange(TIntrusivePtr<TLocalRangeExecutor>          return;      }      AtomicAdd(*queueSize, count); -    jobQueue->EnqueueAll(TVector<TSingleJob>{size_t(count), TSingleJob(rangeExec, 0)});  +    jobQueue->EnqueueAll(TVector<TSingleJob>{size_t(count), TSingleJob(rangeExec, 0)});      HasJob.Signal();  } @@ -269,13 +269,13 @@ void NPar::TLocalExecutor::Exec(TIntrusivePtr<ILocallyExecutable> exec, int id,      Impl_->HasJob.Signal();  } -void NPar::ILocalExecutor::Exec(TLocallyExecutableFunction exec, int id, int flags) {  +void NPar::ILocalExecutor::Exec(TLocallyExecutableFunction exec, int id, int flags) {      Exec(new TFunctionWrapper(std::move(exec)), id, flags);  }  void NPar::TLocalExecutor::ExecRange(TIntrusivePtr<ILocallyExecutable> exec, int firstId, int lastId, int flags) {      Y_ASSERT(lastId >= firstId); -    if (TryExecRangeSequentially([=] (int id) { exec->LocalExec(id); }, firstId, lastId, flags)) {  +    if (TryExecRangeSequentially([=] (int id) { exec->LocalExec(id); }, firstId, lastId, flags)) {          return;      }      auto rangeExec = MakeIntrusive<TLocalRangeExecutor>(std::move(exec), firstId, lastId); @@ -305,18 +305,18 @@ void NPar::TLocalExecutor::ExecRange(TIntrusivePtr<ILocallyExecutable> exec, int      }  } -void NPar::ILocalExecutor::ExecRange(TLocallyExecutableFunction exec, int firstId, int lastId, int flags) {  -    if (TryExecRangeSequentially(exec, firstId, lastId, flags)) {  -        return;  -    }  +void NPar::ILocalExecutor::ExecRange(TLocallyExecutableFunction exec, int firstId, int lastId, int flags) { +    if (TryExecRangeSequentially(exec, firstId, lastId, flags)) { +        return; +    }      ExecRange(new TFunctionWrapper(exec), firstId, lastId, flags);  } -void NPar::ILocalExecutor::ExecRangeWithThrow(TLocallyExecutableFunction exec, int firstId, int lastId, int flags) {  +void NPar::ILocalExecutor::ExecRangeWithThrow(TLocallyExecutableFunction exec, int firstId, int lastId, int flags) {      Y_VERIFY((flags & WAIT_COMPLETE) != 0, "ExecRangeWithThrow() requires WAIT_COMPLETE to wait if exceptions arise."); -    if (TryExecRangeSequentially(exec, firstId, lastId, flags)) {  -        return;  -    }  +    if (TryExecRangeSequentially(exec, firstId, lastId, flags)) { +        return; +    }      TVector<NThreading::TFuture<void>> currentRun = ExecRangeWithFutures(exec, firstId, lastId, flags);      for (auto& result : currentRun) {          result.GetValueSync(); // Exception will be rethrown if exists. If several exception - only the one with minimal id is rethrown. @@ -324,7 +324,7 @@ void NPar::ILocalExecutor::ExecRangeWithThrow(TLocallyExecutableFunction exec, i  }  TVector<NThreading::TFuture<void>> -NPar::ILocalExecutor::ExecRangeWithFutures(TLocallyExecutableFunction exec, int firstId, int lastId, int flags) {  +NPar::ILocalExecutor::ExecRangeWithFutures(TLocallyExecutableFunction exec, int firstId, int lastId, int flags) {      TFunctionWrapperWithPromise* execWrapper = new TFunctionWrapperWithPromise(exec, firstId, lastId);      TVector<NThreading::TFuture<void>> out = execWrapper->GetFutures();      ExecRange(execWrapper, firstId, lastId, flags); diff --git a/library/cpp/threading/local_executor/local_executor.h b/library/cpp/threading/local_executor/local_executor.h index cbde5c62f41..c1c824f67cb 100644 --- a/library/cpp/threading/local_executor/local_executor.h +++ b/library/cpp/threading/local_executor/local_executor.h @@ -2,7 +2,7 @@  #include <library/cpp/threading/future/future.h> -#include <util/generic/cast.h>  +#include <util/generic/cast.h>  #include <util/generic/fwd.h>  #include <util/generic/noncopyable.h>  #include <util/generic/ptr.h> @@ -26,11 +26,11 @@ namespace NPar {      //      using TLocallyExecutableFunction = std::function<void(int)>; -    class ILocalExecutor: public TNonCopyable {  +    class ILocalExecutor: public TNonCopyable {      public: -        ILocalExecutor() = default;  -        virtual ~ILocalExecutor() = default;  -  +        ILocalExecutor() = default; +        virtual ~ILocalExecutor() = default; +          enum EFlags : int {              HIGH_PRIORITY = 0,              MED_PRIORITY = 1, @@ -38,60 +38,60 @@ namespace NPar {              PRIORITY_MASK = 3,              WAIT_COMPLETE = 4          }; -  -        // Add task for further execution.  -        //  -        // @param exec          Task description.  -        // @param id            Task argument.  -        // @param flags         Bitmask composed by `HIGH_PRIORITY`, `MED_PRIORITY`, `LOW_PRIORITY`  -        //                      and `WAIT_COMPLETE`.  -        virtual void Exec(TIntrusivePtr<ILocallyExecutable> exec, int id, int flags) = 0;  -  -        // Add tasks range for further execution.  -        //  -        // @param exec                      Task description.  -        // @param firstId, lastId           Task arguments [firstId, lastId)  -        // @param flags                     Same as for `Exec`.  -        virtual void ExecRange(TIntrusivePtr<ILocallyExecutable> exec, int firstId, int lastId, int flags) = 0;  -  -        // 0-based ILocalExecutor worker thread identification  -        virtual int GetWorkerThreadId() const noexcept = 0;  -        virtual int GetThreadCount() const noexcept = 0;  -  + +        // Add task for further execution. +        // +        // @param exec          Task description. +        // @param id            Task argument. +        // @param flags         Bitmask composed by `HIGH_PRIORITY`, `MED_PRIORITY`, `LOW_PRIORITY` +        //                      and `WAIT_COMPLETE`. +        virtual void Exec(TIntrusivePtr<ILocallyExecutable> exec, int id, int flags) = 0; + +        // Add tasks range for further execution. +        // +        // @param exec                      Task description. +        // @param firstId, lastId           Task arguments [firstId, lastId) +        // @param flags                     Same as for `Exec`. +        virtual void ExecRange(TIntrusivePtr<ILocallyExecutable> exec, int firstId, int lastId, int flags) = 0; + +        // 0-based ILocalExecutor worker thread identification +        virtual int GetWorkerThreadId() const noexcept = 0; +        virtual int GetThreadCount() const noexcept = 0; +          // Describes a range of tasks with parameters from integer range [FirstId, LastId).          //          class TExecRangeParams { -        public:  -            template <typename TFirst, typename TLast>  -            TExecRangeParams(TFirst firstId, TLast lastId)  -                : FirstId(SafeIntegerCast<int>(firstId))  -                , LastId(SafeIntegerCast<int>(lastId))  -            {  -                Y_ASSERT(LastId >= FirstId);  +        public: +            template <typename TFirst, typename TLast> +            TExecRangeParams(TFirst firstId, TLast lastId) +                : FirstId(SafeIntegerCast<int>(firstId)) +                , LastId(SafeIntegerCast<int>(lastId)) +            { +                Y_ASSERT(LastId >= FirstId);                  SetBlockSize(1); -            }  +            }              // Partition tasks into `blockCount` blocks of approximately equal size, each of which              // will be executed as a separate bigger task.              // -            template <typename TBlockCount>  -            TExecRangeParams& SetBlockCount(TBlockCount blockCount) {  +            template <typename TBlockCount> +            TExecRangeParams& SetBlockCount(TBlockCount blockCount) {                  Y_ASSERT(SafeIntegerCast<int>(blockCount) > 0 || FirstId == LastId);                  BlockSize = FirstId == LastId ? 0 : CeilDiv(LastId - FirstId, SafeIntegerCast<int>(blockCount));                  BlockCount = BlockSize == 0 ? 0 : CeilDiv(LastId - FirstId, BlockSize);                  BlockEqualToThreads = false; -                return *this;  -            }  +                return *this; +            }              // Partition tasks into blocks of approximately `blockSize` size, each of which will              // be executed as a separate bigger task.              // -            template <typename TBlockSize>  -            TExecRangeParams& SetBlockSize(TBlockSize blockSize) {  +            template <typename TBlockSize> +            TExecRangeParams& SetBlockSize(TBlockSize blockSize) {                  Y_ASSERT(SafeIntegerCast<int>(blockSize) > 0 || FirstId == LastId); -                BlockSize = SafeIntegerCast<int>(blockSize);  +                BlockSize = SafeIntegerCast<int>(blockSize);                  BlockCount = BlockSize == 0 ? 0 : CeilDiv(LastId - FirstId, BlockSize);                  BlockEqualToThreads = false; -                return *this;  -            }  +                return *this; +            }              // Partition tasks into thread count blocks of approximately equal size, each of which              // will be executed as a separate bigger task.              // @@ -99,27 +99,27 @@ namespace NPar {                  BlockEqualToThreads = true;                  return *this;              } -            int GetBlockCount() const {  +            int GetBlockCount() const {                  Y_ASSERT(!BlockEqualToThreads); -                return BlockCount;  -            }  -            int GetBlockSize() const {  +                return BlockCount; +            } +            int GetBlockSize() const {                  Y_ASSERT(!BlockEqualToThreads); -                return BlockSize;  -            }  +                return BlockSize; +            }              bool GetBlockEqualToThreads() {                  return BlockEqualToThreads;              } -  -            const int FirstId = 0;  -            const int LastId = 0;  -  -        private:  + +            const int FirstId = 0; +            const int LastId = 0; + +        private:              int BlockSize;              int BlockCount;              bool BlockEqualToThreads; -        };  -  +        }; +          // `Exec` and `ExecRange` versions that accept functions.          //          void Exec(TLocallyExecutableFunction exec, int id, int flags); @@ -136,26 +136,26 @@ namespace NPar {          TVector<NThreading::TFuture<void>> ExecRangeWithFutures(TLocallyExecutableFunction exec, int firstId, int lastId, int flags);          template <typename TBody> -        static inline auto BlockedLoopBody(const TExecRangeParams& params, const TBody& body) {  +        static inline auto BlockedLoopBody(const TExecRangeParams& params, const TBody& body) {              return [=](int blockId) {                  const int blockFirstId = params.FirstId + blockId * params.GetBlockSize();                  const int blockLastId = Min(params.LastId, blockFirstId + params.GetBlockSize()); -                for (int i = blockFirstId; i < blockLastId; ++i) {  -                    body(i);  -                }  -            };  -        }  -  -        template <typename TBody>  +                for (int i = blockFirstId; i < blockLastId; ++i) { +                    body(i); +                } +            }; +        } + +        template <typename TBody>          inline void ExecRange(TBody&& body, TExecRangeParams params, int flags) { -            if (TryExecRangeSequentially(body, params.FirstId, params.LastId, flags)) {  +            if (TryExecRangeSequentially(body, params.FirstId, params.LastId, flags)) {                  return;              }              if (params.GetBlockEqualToThreads()) {                  params.SetBlockCount(GetThreadCount() + ((flags & WAIT_COMPLETE) != 0)); // ThreadCount or ThreadCount+1 depending on WaitFlag -            }  +            }              ExecRange(BlockedLoopBody(params, body), 0, params.GetBlockCount(), flags); -        }  +        }          template <typename TBody>          inline void ExecRangeBlockedWithThrow(TBody&& body, int firstId, int lastId, int batchSizeOrZeroForAutoBatchSize, int flags) { @@ -185,97 +185,97 @@ namespace NPar {              }          } -        template <typename TBody>  -        static inline bool TryExecRangeSequentially(TBody&& body, int firstId, int lastId, int flags) {  -            if (lastId == firstId) {  -                return true;  -            }  -            if ((flags & WAIT_COMPLETE) && lastId - firstId == 1) {  -                body(firstId);  -                return true;  -            }  -            return false;  -        }  +        template <typename TBody> +        static inline bool TryExecRangeSequentially(TBody&& body, int firstId, int lastId, int flags) { +            if (lastId == firstId) { +                return true; +            } +            if ((flags & WAIT_COMPLETE) && lastId - firstId == 1) { +                body(firstId); +                return true; +            } +            return false; +        } +    }; + +    // `TLocalExecutor` provides facilities for easy parallelization of existing code and cycles. +    // +    // Examples: +    // Execute one task with medium priority and wait for it completion. +    // ``` +    // LocalExecutor().Run(4); +    // TEvent event; +    // LocalExecutor().Exec([](int) { +    //     SomeFunc(); +    //     event.Signal(); +    // }, 0, TLocalExecutor::MED_PRIORITY); +    // +    // SomeOtherCode(); +    // event.WaitI(); +    // ``` +    // +    // Execute range of tasks with medium priority. +    // ``` +    // LocalExecutor().Run(4); +    // LocalExecutor().ExecRange([](int id) { +    //     SomeFunc(id); +    // }, TExecRangeParams(0, 10), TLocalExecutor::WAIT_COMPLETE | TLocalExecutor::MED_PRIORITY); +    // ``` +    // +    class TLocalExecutor final: public ILocalExecutor { +    public: +        using EFlags = ILocalExecutor::EFlags; + +        // Creates executor without threads. You'll need to explicitly call `RunAdditionalThreads` +        // to add threads to underlying thread pool. +        // +        TLocalExecutor(); +        ~TLocalExecutor(); + +        int GetQueueSize() const noexcept; +        int GetMPQueueSize() const noexcept; +        int GetLPQueueSize() const noexcept; +        void ClearLPQueue(); + +        // 0-based TLocalExecutor worker thread identification +        int GetWorkerThreadId() const noexcept override; +        int GetThreadCount() const noexcept override; + +        // **Add** threads to underlying thread pool. +        // +        // @param threadCount       Number of threads to add. +        void RunAdditionalThreads(int threadCount); + +        // Add task for further execution. +        // +        // @param exec          Task description. +        // @param id            Task argument. +        // @param flags         Bitmask composed by `HIGH_PRIORITY`, `MED_PRIORITY`, `LOW_PRIORITY` +        //                      and `WAIT_COMPLETE`. +        void Exec(TIntrusivePtr<ILocallyExecutable> exec, int id, int flags) override; + +        // Add tasks range for further execution. +        // +        // @param exec                      Task description. +        // @param firstId, lastId           Task arguments [firstId, lastId) +        // @param flags                     Same as for `Exec`. +        void ExecRange(TIntrusivePtr<ILocallyExecutable> exec, int firstId, int lastId, int flags) override; + +        using ILocalExecutor::Exec; +        using ILocalExecutor::ExecRange; + +    private: +        class TImpl; +        THolder<TImpl> Impl_;      }; -    // `TLocalExecutor` provides facilities for easy parallelization of existing code and cycles.  -    //  -    // Examples:  -    // Execute one task with medium priority and wait for it completion.  -    // ```  -    // LocalExecutor().Run(4);  -    // TEvent event;  -    // LocalExecutor().Exec([](int) {  -    //     SomeFunc();  -    //     event.Signal();  -    // }, 0, TLocalExecutor::MED_PRIORITY);  -    //  -    // SomeOtherCode();  -    // event.WaitI();  -    // ```  -    //  -    // Execute range of tasks with medium priority.  -    // ```  -    // LocalExecutor().Run(4);  -    // LocalExecutor().ExecRange([](int id) {  -    //     SomeFunc(id);  -    // }, TExecRangeParams(0, 10), TLocalExecutor::WAIT_COMPLETE | TLocalExecutor::MED_PRIORITY);  -    // ```  -    //  -    class TLocalExecutor final: public ILocalExecutor {  -    public:  -        using EFlags = ILocalExecutor::EFlags;  -  -        // Creates executor without threads. You'll need to explicitly call `RunAdditionalThreads`  -        // to add threads to underlying thread pool.  -        //  -        TLocalExecutor();  -        ~TLocalExecutor();  -  -        int GetQueueSize() const noexcept;  -        int GetMPQueueSize() const noexcept;  -        int GetLPQueueSize() const noexcept;  -        void ClearLPQueue();  -  -        // 0-based TLocalExecutor worker thread identification  -        int GetWorkerThreadId() const noexcept override;  -        int GetThreadCount() const noexcept override;  -  -        // **Add** threads to underlying thread pool.  -        //  -        // @param threadCount       Number of threads to add.  -        void RunAdditionalThreads(int threadCount);  -  -        // Add task for further execution.  -        //  -        // @param exec          Task description.  -        // @param id            Task argument.  -        // @param flags         Bitmask composed by `HIGH_PRIORITY`, `MED_PRIORITY`, `LOW_PRIORITY`  -        //                      and `WAIT_COMPLETE`.  -        void Exec(TIntrusivePtr<ILocallyExecutable> exec, int id, int flags) override;  -  -        // Add tasks range for further execution.  -        //  -        // @param exec                      Task description.  -        // @param firstId, lastId           Task arguments [firstId, lastId)  -        // @param flags                     Same as for `Exec`.  -        void ExecRange(TIntrusivePtr<ILocallyExecutable> exec, int firstId, int lastId, int flags) override;  -  -        using ILocalExecutor::Exec;  -        using ILocalExecutor::ExecRange;  -  -    private:  -        class TImpl;  -        THolder<TImpl> Impl_;  -    };  -       static inline TLocalExecutor& LocalExecutor() {          return *Singleton<TLocalExecutor>();      }      template <typename TBody> -    inline void ParallelFor(ILocalExecutor& executor, ui32 from, ui32 to, TBody&& body) {  -        ILocalExecutor::TExecRangeParams params(from, to);  +    inline void ParallelFor(ILocalExecutor& executor, ui32 from, ui32 to, TBody&& body) { +        ILocalExecutor::TExecRangeParams params(from, to);          params.SetBlockCountToThreadCount();          executor.ExecRange(std::forward<TBody>(body), params, TLocalExecutor::WAIT_COMPLETE);      } @@ -287,7 +287,7 @@ namespace NPar {      template <typename TBody>      inline void AsyncParallelFor(ui32 from, ui32 to, TBody&& body) { -        ILocalExecutor::TExecRangeParams params(from, to);  +        ILocalExecutor::TExecRangeParams params(from, to);          params.SetBlockCountToThreadCount();          LocalExecutor().ExecRange(std::forward<TBody>(body), params, 0);      } | 
