diff options
author | Devtools Arcadia <[email protected]> | 2022-02-07 18:08:42 +0300 |
---|---|---|
committer | Devtools Arcadia <[email protected]> | 2022-02-07 18:08:42 +0300 |
commit | 1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch) | |
tree | e26c9fed0de5d9873cce7e00bc214573dc2195b7 /library/cpp/threading/local_executor |
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'library/cpp/threading/local_executor')
-rw-r--r-- | library/cpp/threading/local_executor/README.md | 74 | ||||
-rw-r--r-- | library/cpp/threading/local_executor/local_executor.cpp | 369 | ||||
-rw-r--r-- | library/cpp/threading/local_executor/local_executor.h | 294 | ||||
-rw-r--r-- | library/cpp/threading/local_executor/tbb_local_executor.cpp | 53 | ||||
-rw-r--r-- | library/cpp/threading/local_executor/tbb_local_executor.h | 49 | ||||
-rw-r--r-- | library/cpp/threading/local_executor/ut/local_executor_ut.cpp | 371 | ||||
-rw-r--r-- | library/cpp/threading/local_executor/ut/ya.make | 12 | ||||
-rw-r--r-- | library/cpp/threading/local_executor/ya.make | 20 |
8 files changed, 1242 insertions, 0 deletions
diff --git a/library/cpp/threading/local_executor/README.md b/library/cpp/threading/local_executor/README.md new file mode 100644 index 00000000000..aaad2e2986c --- /dev/null +++ b/library/cpp/threading/local_executor/README.md @@ -0,0 +1,74 @@ +# Library for parallel task execution in thread pool + +This library allows easy parallelization of existing code and cycles. +It provides `NPar::TLocalExecutor` class and `NPar::LocalExecutor()` singleton accessor. +At start, `TLocalExecutor` has no threads in thread pool and all async tasks will be queued for later execution when extra threads appear. +All tasks should be `NPar::ILocallyExecutable` child class or function equal to `std::function<void(int)>` + +## TLocalExecutor methods + +`TLocalExecutor::Run(int threadcount)` - add threads to thread pool (**WARNING!** `Run(threadcount)` will *add* `threadcount` threads to pool) + +`void TLocalExecutor::Exec(TLocallyExecutableFunction exec, int id, int flags)` - run one task and pass id as task function input, flags - bitmask composition of: + +- `TLocalExecutor::HIGH_PRIORITY = 0` - put task in high priority queue +- `TLocalExecutor::MED_PRIORITY = 1` - put task in medium priority queue +- `TLocalExecutor::LOW_PRIORITY = 2` - put task in low priority queue +- `TLocalExecutor::WAIT_COMPLETE = 4` - wait for task completion + +`void TLocalExecutor::ExecRange(TLocallyExecutableFunction exec, TExecRangeParams blockParams, int flags);` - run range of tasks `[TExecRangeParams::FirstId, TExecRangeParams::LastId).` + +`flags` is the same as for `TLocalExecutor::Exec`. + +`TExecRangeParams` is a structure that describes the range. +By default each task is executed separately. Threads from thread pool are taking +the tasks in the manner first come first serve. + +It is also possible to partition range of tasks in consequtive blocks and execute each block as a bigger task. +`TExecRangeParams::SetBlockCountToThreadCount()` will result in thread count tasks, + where thread count is the count of threads in thread pool. + each thread will execute approximately equal count of tasks from range. + +`TExecRangeParams::SetBlockSize()` and `TExecRangeParams::SetBlockCount()` will partition +the range of tasks into consequtive blocks of approximately given size, or of size calculated + by partitioning the range into approximately equal size blocks of given count. + +## Examples + +### Simple task async exec with medium priority + +```cpp +using namespace NPar; + +LocalExecutor().Run(4); +TEvent event; +LocalExecutor().Exec([](int) { + SomeFunc(); + event.Signal(); +}, 0, TLocalExecutor::MED_PRIORITY); + +SomeOtherCode(); +event.WaitI(); +``` + +### Execute task range and wait completion + +```cpp +using namespace NPar; + +LocalExecutor().Run(4); +LocalExecutor().ExecRange([](int id) { + SomeFunc(id); +}, TExecRangeParams(0, 10), TLocalExecutor::WAIT_COMPLETE | TLocalExecutor::MED_PRIORITY); +``` + +### Exception handling + +By default if a not caught exception arise in a task which runs through the Local Executor, then std::terminate() will be called immediately. The exception will be printed to stderr before the termination. Best practice is to handle exception within a task, or avoid throwing exceptions at all for performance reasons. + +However, if you'd like to handle and/or rethrow exceptions outside of a range, you can use ExecRangeWithFuture(). +It returns vector [0 .. LastId-FirstId] elements, where i-th element is a TFuture corresponding to task with id = (FirstId + i). +Use method .HasValue() of the element to check in Async mode if the corresponding task is complete. +Use .GetValue() or .GetValueSync() to wait for completion of the corresponding task. GetValue() and GetValueSync() will also rethrow an exception if it appears during execution of the task. + +You may also use ExecRangeWithThrow() to just receive an exception from a range if it appears. It rethrows an exception from a task with minimal id if such an exception exists, and guarantees normal flow if no exception arise. diff --git a/library/cpp/threading/local_executor/local_executor.cpp b/library/cpp/threading/local_executor/local_executor.cpp new file mode 100644 index 00000000000..1d3fbb4bf44 --- /dev/null +++ b/library/cpp/threading/local_executor/local_executor.cpp @@ -0,0 +1,369 @@ +#include "local_executor.h" + +#include <library/cpp/threading/future/future.h> + +#include <util/generic/utility.h> +#include <util/system/atomic.h> +#include <util/system/event.h> +#include <util/system/thread.h> +#include <util/system/tls.h> +#include <util/system/yield.h> +#include <util/thread/lfqueue.h> + +#include <utility> + +#ifdef _win_ +static void RegularYield() { +} +#else +// unix actually has cooperative multitasking! :) +// without this function program runs slower and system lags for some magic reason +static void RegularYield() { + SchedYield(); +} +#endif + +namespace { + struct TFunctionWrapper : NPar::ILocallyExecutable { + NPar::TLocallyExecutableFunction Exec; + TFunctionWrapper(NPar::TLocallyExecutableFunction exec) + : Exec(std::move(exec)) + { + } + void LocalExec(int id) override { + Exec(id); + } + }; + + class TFunctionWrapperWithPromise: public NPar::ILocallyExecutable { + private: + NPar::TLocallyExecutableFunction Exec; + int FirstId, LastId; + TVector<NThreading::TPromise<void>> Promises; + + public: + TFunctionWrapperWithPromise(NPar::TLocallyExecutableFunction exec, int firstId, int lastId) + : Exec(std::move(exec)) + , FirstId(firstId) + , LastId(lastId) + { + Y_ASSERT(FirstId <= LastId); + const int rangeSize = LastId - FirstId; + Promises.resize(rangeSize, NThreading::NewPromise()); + for (auto& promise : Promises) { + promise = NThreading::NewPromise(); + } + } + + void LocalExec(int id) override { + Y_ASSERT(FirstId <= id && id < LastId); + NThreading::NImpl::SetValue(Promises[id - FirstId], [=] { Exec(id); }); + } + + TVector<NThreading::TFuture<void>> GetFutures() const { + TVector<NThreading::TFuture<void>> out; + out.reserve(Promises.ysize()); + for (auto& promise : Promises) { + out.push_back(promise.GetFuture()); + } + return out; + } + }; + + struct TSingleJob { + TIntrusivePtr<NPar::ILocallyExecutable> Exec; + int Id{0}; + + TSingleJob() = default; + TSingleJob(TIntrusivePtr<NPar::ILocallyExecutable> exec, int id) + : Exec(std::move(exec)) + , Id(id) + { + } + }; + + class TLocalRangeExecutor: public NPar::ILocallyExecutable { + TIntrusivePtr<NPar::ILocallyExecutable> Exec; + alignas(64) TAtomic Counter; + alignas(64) TAtomic WorkerCount; + int LastId; + + void LocalExec(int) override { + AtomicAdd(WorkerCount, 1); + for (;;) { + if (!DoSingleOp()) + break; + } + AtomicAdd(WorkerCount, -1); + } + + public: + TLocalRangeExecutor(TIntrusivePtr<ILocallyExecutable> exec, int firstId, int lastId) + : Exec(std::move(exec)) + , Counter(firstId) + , WorkerCount(0) + , LastId(lastId) + { + } + bool DoSingleOp() { + const int id = AtomicAdd(Counter, 1) - 1; + if (id >= LastId) + return false; + Exec->LocalExec(id); + RegularYield(); + return true; + } + void WaitComplete() { + while (AtomicGet(WorkerCount) > 0) + RegularYield(); + } + int GetRangeSize() const { + return Max<int>(LastId - Counter, 0); + } + }; + +} + +////////////////////////////////////////////////////////////////////////// +class NPar::TLocalExecutor::TImpl { +public: + TLockFreeQueue<TSingleJob> JobQueue; + TLockFreeQueue<TSingleJob> MedJobQueue; + TLockFreeQueue<TSingleJob> LowJobQueue; + alignas(64) TSystemEvent HasJob; + + TAtomic ThreadCount{0}; + alignas(64) TAtomic QueueSize{0}; + TAtomic MPQueueSize{0}; + TAtomic LPQueueSize{0}; + TAtomic ThreadId{0}; + + Y_THREAD(int) + CurrentTaskPriority; + Y_THREAD(int) + WorkerThreadId; + + static void* HostWorkerThread(void* p); + bool GetJob(TSingleJob* job); + void RunNewThread(); + void LaunchRange(TIntrusivePtr<TLocalRangeExecutor> execRange, int queueSizeLimit, + TAtomic* queueSize, TLockFreeQueue<TSingleJob>* jobQueue); + + TImpl() = default; + ~TImpl(); +}; + +NPar::TLocalExecutor::TImpl::~TImpl() { + AtomicAdd(QueueSize, 1); + JobQueue.Enqueue(TSingleJob(nullptr, 0)); + HasJob.Signal(); + while (AtomicGet(ThreadCount)) { + ThreadYield(); + } +} + +void* NPar::TLocalExecutor::TImpl::HostWorkerThread(void* p) { + static const int FAST_ITERATIONS = 200; + + auto* const ctx = (TImpl*)p; + TThread::SetCurrentThreadName("ParLocalExecutor"); + ctx->WorkerThreadId = AtomicAdd(ctx->ThreadId, 1); + for (bool cont = true; cont;) { + TSingleJob job; + bool gotJob = false; + for (int iter = 0; iter < FAST_ITERATIONS; ++iter) { + if (ctx->GetJob(&job)) { + gotJob = true; + break; + } + } + if (!gotJob) { + ctx->HasJob.Reset(); + if (!ctx->GetJob(&job)) { + ctx->HasJob.Wait(); + continue; + } + } + if (job.Exec.Get()) { + job.Exec->LocalExec(job.Id); + RegularYield(); + } else { + AtomicAdd(ctx->QueueSize, 1); + ctx->JobQueue.Enqueue(job); + ctx->HasJob.Signal(); + cont = false; + } + } + AtomicAdd(ctx->ThreadCount, -1); + return nullptr; +} + +bool NPar::TLocalExecutor::TImpl::GetJob(TSingleJob* job) { + if (JobQueue.Dequeue(job)) { + CurrentTaskPriority = TLocalExecutor::HIGH_PRIORITY; + AtomicAdd(QueueSize, -1); + return true; + } else if (MedJobQueue.Dequeue(job)) { + CurrentTaskPriority = TLocalExecutor::MED_PRIORITY; + AtomicAdd(MPQueueSize, -1); + return true; + } else if (LowJobQueue.Dequeue(job)) { + CurrentTaskPriority = TLocalExecutor::LOW_PRIORITY; + AtomicAdd(LPQueueSize, -1); + return true; + } + return false; +} + +void NPar::TLocalExecutor::TImpl::RunNewThread() { + AtomicAdd(ThreadCount, 1); + TThread thr(HostWorkerThread, this); + thr.Start(); + thr.Detach(); +} + +void NPar::TLocalExecutor::TImpl::LaunchRange(TIntrusivePtr<TLocalRangeExecutor> rangeExec, + int queueSizeLimit, + TAtomic* queueSize, + TLockFreeQueue<TSingleJob>* jobQueue) { + int count = Min<int>(ThreadCount + 1, rangeExec->GetRangeSize()); + if (queueSizeLimit >= 0 && AtomicGet(*queueSize) >= queueSizeLimit) { + return; + } + AtomicAdd(*queueSize, count); + jobQueue->EnqueueAll(TVector<TSingleJob>{size_t(count), TSingleJob(rangeExec, 0)}); + HasJob.Signal(); +} + +NPar::TLocalExecutor::TLocalExecutor() + : Impl_{MakeHolder<TImpl>()} { +} + +NPar::TLocalExecutor::~TLocalExecutor() = default; + +void NPar::TLocalExecutor::RunAdditionalThreads(int threadCount) { + for (int i = 0; i < threadCount; i++) + Impl_->RunNewThread(); +} + +void NPar::TLocalExecutor::Exec(TIntrusivePtr<ILocallyExecutable> exec, int id, int flags) { + Y_ASSERT((flags & WAIT_COMPLETE) == 0); // unsupported + int prior = Max<int>(Impl_->CurrentTaskPriority, flags & PRIORITY_MASK); + switch (prior) { + case HIGH_PRIORITY: + AtomicAdd(Impl_->QueueSize, 1); + Impl_->JobQueue.Enqueue(TSingleJob(std::move(exec), id)); + break; + case MED_PRIORITY: + AtomicAdd(Impl_->MPQueueSize, 1); + Impl_->MedJobQueue.Enqueue(TSingleJob(std::move(exec), id)); + break; + case LOW_PRIORITY: + AtomicAdd(Impl_->LPQueueSize, 1); + Impl_->LowJobQueue.Enqueue(TSingleJob(std::move(exec), id)); + break; + default: + Y_ASSERT(0); + break; + } + Impl_->HasJob.Signal(); +} + +void NPar::ILocalExecutor::Exec(TLocallyExecutableFunction exec, int id, int flags) { + Exec(new TFunctionWrapper(std::move(exec)), id, flags); +} + +void NPar::TLocalExecutor::ExecRange(TIntrusivePtr<ILocallyExecutable> exec, int firstId, int lastId, int flags) { + Y_ASSERT(lastId >= firstId); + if (TryExecRangeSequentially([=] (int id) { exec->LocalExec(id); }, firstId, lastId, flags)) { + return; + } + auto rangeExec = MakeIntrusive<TLocalRangeExecutor>(std::move(exec), firstId, lastId); + int queueSizeLimit = (flags & WAIT_COMPLETE) ? 10000 : -1; + int prior = Max<int>(Impl_->CurrentTaskPriority, flags & PRIORITY_MASK); + switch (prior) { + case HIGH_PRIORITY: + Impl_->LaunchRange(rangeExec, queueSizeLimit, &Impl_->QueueSize, &Impl_->JobQueue); + break; + case MED_PRIORITY: + Impl_->LaunchRange(rangeExec, queueSizeLimit, &Impl_->MPQueueSize, &Impl_->MedJobQueue); + break; + case LOW_PRIORITY: + Impl_->LaunchRange(rangeExec, queueSizeLimit, &Impl_->LPQueueSize, &Impl_->LowJobQueue); + break; + default: + Y_ASSERT(0); + break; + } + if (flags & WAIT_COMPLETE) { + int keepPrior = Impl_->CurrentTaskPriority; + Impl_->CurrentTaskPriority = prior; + while (rangeExec->DoSingleOp()) { + } + Impl_->CurrentTaskPriority = keepPrior; + rangeExec->WaitComplete(); + } +} + +void NPar::ILocalExecutor::ExecRange(TLocallyExecutableFunction exec, int firstId, int lastId, int flags) { + if (TryExecRangeSequentially(exec, firstId, lastId, flags)) { + return; + } + ExecRange(new TFunctionWrapper(exec), firstId, lastId, flags); +} + +void NPar::ILocalExecutor::ExecRangeWithThrow(TLocallyExecutableFunction exec, int firstId, int lastId, int flags) { + Y_VERIFY((flags & WAIT_COMPLETE) != 0, "ExecRangeWithThrow() requires WAIT_COMPLETE to wait if exceptions arise."); + if (TryExecRangeSequentially(exec, firstId, lastId, flags)) { + return; + } + TVector<NThreading::TFuture<void>> currentRun = ExecRangeWithFutures(exec, firstId, lastId, flags); + for (auto& result : currentRun) { + result.GetValueSync(); // Exception will be rethrown if exists. If several exception - only the one with minimal id is rethrown. + } +} + +TVector<NThreading::TFuture<void>> +NPar::ILocalExecutor::ExecRangeWithFutures(TLocallyExecutableFunction exec, int firstId, int lastId, int flags) { + TFunctionWrapperWithPromise* execWrapper = new TFunctionWrapperWithPromise(exec, firstId, lastId); + TVector<NThreading::TFuture<void>> out = execWrapper->GetFutures(); + ExecRange(execWrapper, firstId, lastId, flags); + return out; +} + +void NPar::TLocalExecutor::ClearLPQueue() { + for (bool cont = true; cont;) { + cont = false; + TSingleJob job; + while (Impl_->LowJobQueue.Dequeue(&job)) { + AtomicAdd(Impl_->LPQueueSize, -1); + cont = true; + } + while (Impl_->MedJobQueue.Dequeue(&job)) { + AtomicAdd(Impl_->MPQueueSize, -1); + cont = true; + } + } +} + +int NPar::TLocalExecutor::GetQueueSize() const noexcept { + return AtomicGet(Impl_->QueueSize); +} + +int NPar::TLocalExecutor::GetMPQueueSize() const noexcept { + return AtomicGet(Impl_->MPQueueSize); +} + +int NPar::TLocalExecutor::GetLPQueueSize() const noexcept { + return AtomicGet(Impl_->LPQueueSize); +} + +int NPar::TLocalExecutor::GetWorkerThreadId() const noexcept { + return Impl_->WorkerThreadId; +} + +int NPar::TLocalExecutor::GetThreadCount() const noexcept { + return AtomicGet(Impl_->ThreadCount); +} + +////////////////////////////////////////////////////////////////////////// diff --git a/library/cpp/threading/local_executor/local_executor.h b/library/cpp/threading/local_executor/local_executor.h new file mode 100644 index 00000000000..c1c824f67cb --- /dev/null +++ b/library/cpp/threading/local_executor/local_executor.h @@ -0,0 +1,294 @@ +#pragma once + +#include <library/cpp/threading/future/future.h> + +#include <util/generic/cast.h> +#include <util/generic/fwd.h> +#include <util/generic/noncopyable.h> +#include <util/generic/ptr.h> +#include <util/generic/singleton.h> +#include <util/generic/ymath.h> + +#include <functional> + +namespace NPar { + struct ILocallyExecutable : virtual public TThrRefBase { + // Must be implemented by the end user to define job that will be processed by one of + // executor threads. + // + // @param id Job parameter, typically an index pointing somewhere in array, or just + // some dummy value, e.g. `0`. + virtual void LocalExec(int id) = 0; + }; + + // Alternative and simpler way of describing a job for executor. Function argument has the + // same meaning as `id` in `ILocallyExecutable::LocalExec`. + // + using TLocallyExecutableFunction = std::function<void(int)>; + + class ILocalExecutor: public TNonCopyable { + public: + ILocalExecutor() = default; + virtual ~ILocalExecutor() = default; + + enum EFlags : int { + HIGH_PRIORITY = 0, + MED_PRIORITY = 1, + LOW_PRIORITY = 2, + PRIORITY_MASK = 3, + WAIT_COMPLETE = 4 + }; + + // Add task for further execution. + // + // @param exec Task description. + // @param id Task argument. + // @param flags Bitmask composed by `HIGH_PRIORITY`, `MED_PRIORITY`, `LOW_PRIORITY` + // and `WAIT_COMPLETE`. + virtual void Exec(TIntrusivePtr<ILocallyExecutable> exec, int id, int flags) = 0; + + // Add tasks range for further execution. + // + // @param exec Task description. + // @param firstId, lastId Task arguments [firstId, lastId) + // @param flags Same as for `Exec`. + virtual void ExecRange(TIntrusivePtr<ILocallyExecutable> exec, int firstId, int lastId, int flags) = 0; + + // 0-based ILocalExecutor worker thread identification + virtual int GetWorkerThreadId() const noexcept = 0; + virtual int GetThreadCount() const noexcept = 0; + + // Describes a range of tasks with parameters from integer range [FirstId, LastId). + // + class TExecRangeParams { + public: + template <typename TFirst, typename TLast> + TExecRangeParams(TFirst firstId, TLast lastId) + : FirstId(SafeIntegerCast<int>(firstId)) + , LastId(SafeIntegerCast<int>(lastId)) + { + Y_ASSERT(LastId >= FirstId); + SetBlockSize(1); + } + // Partition tasks into `blockCount` blocks of approximately equal size, each of which + // will be executed as a separate bigger task. + // + template <typename TBlockCount> + TExecRangeParams& SetBlockCount(TBlockCount blockCount) { + Y_ASSERT(SafeIntegerCast<int>(blockCount) > 0 || FirstId == LastId); + BlockSize = FirstId == LastId ? 0 : CeilDiv(LastId - FirstId, SafeIntegerCast<int>(blockCount)); + BlockCount = BlockSize == 0 ? 0 : CeilDiv(LastId - FirstId, BlockSize); + BlockEqualToThreads = false; + return *this; + } + // Partition tasks into blocks of approximately `blockSize` size, each of which will + // be executed as a separate bigger task. + // + template <typename TBlockSize> + TExecRangeParams& SetBlockSize(TBlockSize blockSize) { + Y_ASSERT(SafeIntegerCast<int>(blockSize) > 0 || FirstId == LastId); + BlockSize = SafeIntegerCast<int>(blockSize); + BlockCount = BlockSize == 0 ? 0 : CeilDiv(LastId - FirstId, BlockSize); + BlockEqualToThreads = false; + return *this; + } + // Partition tasks into thread count blocks of approximately equal size, each of which + // will be executed as a separate bigger task. + // + TExecRangeParams& SetBlockCountToThreadCount() { + BlockEqualToThreads = true; + return *this; + } + int GetBlockCount() const { + Y_ASSERT(!BlockEqualToThreads); + return BlockCount; + } + int GetBlockSize() const { + Y_ASSERT(!BlockEqualToThreads); + return BlockSize; + } + bool GetBlockEqualToThreads() { + return BlockEqualToThreads; + } + + const int FirstId = 0; + const int LastId = 0; + + private: + int BlockSize; + int BlockCount; + bool BlockEqualToThreads; + }; + + // `Exec` and `ExecRange` versions that accept functions. + // + void Exec(TLocallyExecutableFunction exec, int id, int flags); + void ExecRange(TLocallyExecutableFunction exec, int firstId, int lastId, int flags); + + // Version of `ExecRange` that throws exception from task with minimal id if at least one of + // task threw an exception. + // + void ExecRangeWithThrow(TLocallyExecutableFunction exec, int firstId, int lastId, int flags); + + // Version of `ExecRange` that returns vector of futures, thus allowing to retry any task if + // it fails. + // + TVector<NThreading::TFuture<void>> ExecRangeWithFutures(TLocallyExecutableFunction exec, int firstId, int lastId, int flags); + + template <typename TBody> + static inline auto BlockedLoopBody(const TExecRangeParams& params, const TBody& body) { + return [=](int blockId) { + const int blockFirstId = params.FirstId + blockId * params.GetBlockSize(); + const int blockLastId = Min(params.LastId, blockFirstId + params.GetBlockSize()); + for (int i = blockFirstId; i < blockLastId; ++i) { + body(i); + } + }; + } + + template <typename TBody> + inline void ExecRange(TBody&& body, TExecRangeParams params, int flags) { + if (TryExecRangeSequentially(body, params.FirstId, params.LastId, flags)) { + return; + } + if (params.GetBlockEqualToThreads()) { + params.SetBlockCount(GetThreadCount() + ((flags & WAIT_COMPLETE) != 0)); // ThreadCount or ThreadCount+1 depending on WaitFlag + } + ExecRange(BlockedLoopBody(params, body), 0, params.GetBlockCount(), flags); + } + + template <typename TBody> + inline void ExecRangeBlockedWithThrow(TBody&& body, int firstId, int lastId, int batchSizeOrZeroForAutoBatchSize, int flags) { + if (firstId >= lastId) { + return; + } + const int threadCount = Max(GetThreadCount(), 1); + const int batchSize = batchSizeOrZeroForAutoBatchSize + ? batchSizeOrZeroForAutoBatchSize + : (lastId - firstId + threadCount - 1) / threadCount; + const int batchCount = (lastId - firstId + batchSize - 1) / batchSize; + const int batchCountPerThread = (batchCount + threadCount - 1) / threadCount; + auto states = ExecRangeWithFutures( + [=](int threadId) { + for (int batchIdPerThread = 0; batchIdPerThread < batchCountPerThread; ++batchIdPerThread) { + int batchId = batchIdPerThread * threadCount + threadId; + int begin = firstId + batchId * batchSize; + int end = Min(begin + batchSize, lastId); + for (int i = begin; i < end; ++i) { + body(i); + } + } + }, + 0, threadCount, flags); + for (auto& state: states) { + state.GetValueSync(); // Re-throw exception if any. + } + } + + template <typename TBody> + static inline bool TryExecRangeSequentially(TBody&& body, int firstId, int lastId, int flags) { + if (lastId == firstId) { + return true; + } + if ((flags & WAIT_COMPLETE) && lastId - firstId == 1) { + body(firstId); + return true; + } + return false; + } + }; + + // `TLocalExecutor` provides facilities for easy parallelization of existing code and cycles. + // + // Examples: + // Execute one task with medium priority and wait for it completion. + // ``` + // LocalExecutor().Run(4); + // TEvent event; + // LocalExecutor().Exec([](int) { + // SomeFunc(); + // event.Signal(); + // }, 0, TLocalExecutor::MED_PRIORITY); + // + // SomeOtherCode(); + // event.WaitI(); + // ``` + // + // Execute range of tasks with medium priority. + // ``` + // LocalExecutor().Run(4); + // LocalExecutor().ExecRange([](int id) { + // SomeFunc(id); + // }, TExecRangeParams(0, 10), TLocalExecutor::WAIT_COMPLETE | TLocalExecutor::MED_PRIORITY); + // ``` + // + class TLocalExecutor final: public ILocalExecutor { + public: + using EFlags = ILocalExecutor::EFlags; + + // Creates executor without threads. You'll need to explicitly call `RunAdditionalThreads` + // to add threads to underlying thread pool. + // + TLocalExecutor(); + ~TLocalExecutor(); + + int GetQueueSize() const noexcept; + int GetMPQueueSize() const noexcept; + int GetLPQueueSize() const noexcept; + void ClearLPQueue(); + + // 0-based TLocalExecutor worker thread identification + int GetWorkerThreadId() const noexcept override; + int GetThreadCount() const noexcept override; + + // **Add** threads to underlying thread pool. + // + // @param threadCount Number of threads to add. + void RunAdditionalThreads(int threadCount); + + // Add task for further execution. + // + // @param exec Task description. + // @param id Task argument. + // @param flags Bitmask composed by `HIGH_PRIORITY`, `MED_PRIORITY`, `LOW_PRIORITY` + // and `WAIT_COMPLETE`. + void Exec(TIntrusivePtr<ILocallyExecutable> exec, int id, int flags) override; + + // Add tasks range for further execution. + // + // @param exec Task description. + // @param firstId, lastId Task arguments [firstId, lastId) + // @param flags Same as for `Exec`. + void ExecRange(TIntrusivePtr<ILocallyExecutable> exec, int firstId, int lastId, int flags) override; + + using ILocalExecutor::Exec; + using ILocalExecutor::ExecRange; + + private: + class TImpl; + THolder<TImpl> Impl_; + }; + + static inline TLocalExecutor& LocalExecutor() { + return *Singleton<TLocalExecutor>(); + } + + template <typename TBody> + inline void ParallelFor(ILocalExecutor& executor, ui32 from, ui32 to, TBody&& body) { + ILocalExecutor::TExecRangeParams params(from, to); + params.SetBlockCountToThreadCount(); + executor.ExecRange(std::forward<TBody>(body), params, TLocalExecutor::WAIT_COMPLETE); + } + + template <typename TBody> + inline void ParallelFor(ui32 from, ui32 to, TBody&& body) { + ParallelFor(LocalExecutor(), from, to, std::forward<TBody>(body)); + } + + template <typename TBody> + inline void AsyncParallelFor(ui32 from, ui32 to, TBody&& body) { + ILocalExecutor::TExecRangeParams params(from, to); + params.SetBlockCountToThreadCount(); + LocalExecutor().ExecRange(std::forward<TBody>(body), params, 0); + } +} diff --git a/library/cpp/threading/local_executor/tbb_local_executor.cpp b/library/cpp/threading/local_executor/tbb_local_executor.cpp new file mode 100644 index 00000000000..65d66594438 --- /dev/null +++ b/library/cpp/threading/local_executor/tbb_local_executor.cpp @@ -0,0 +1,53 @@ +#include "tbb_local_executor.h" + +template <bool RespectTls> +void NPar::TTbbLocalExecutor<RespectTls>::SubmitAsyncTasks(TLocallyExecutableFunction exec, int firstId, int lastId) { + for (int i = firstId; i < lastId; ++i) { + Group.run([=] { exec(i); }); + } +} + +template <bool RespectTls> +int NPar::TTbbLocalExecutor<RespectTls>::GetThreadCount() const noexcept { + return NumberOfTbbThreads - 1; +} + +template <bool RespectTls> +int NPar::TTbbLocalExecutor<RespectTls>::GetWorkerThreadId() const noexcept { + return TbbArena.execute([] { + return tbb::this_task_arena::current_thread_index(); + }); +} + +template <bool RespectTls> +void NPar::TTbbLocalExecutor<RespectTls>::Exec(TIntrusivePtr<ILocallyExecutable> exec, int id, int flags) { + if (flags & WAIT_COMPLETE) { + exec->LocalExec(id); + } else { + TbbArena.execute([=] { + SubmitAsyncTasks([=] (int id) { exec->LocalExec(id); }, id, id + 1); + }); + } +} + +template <bool RespectTls> +void NPar::TTbbLocalExecutor<RespectTls>::ExecRange(TIntrusivePtr<ILocallyExecutable> exec, int firstId, int lastId, int flags) { + if (flags & WAIT_COMPLETE) { + TbbArena.execute([=] { + if (RespectTls) { + tbb::this_task_arena::isolate([=]{ + tbb::parallel_for(firstId, lastId, [=] (int id) { exec->LocalExec(id); }); + }); + } else { + tbb::parallel_for(firstId, lastId, [=] (int id) { exec->LocalExec(id); }); + } + }); + } else { + TbbArena.execute([=] { + SubmitAsyncTasks([=] (int id) { exec->LocalExec(id); }, firstId, lastId); + }); + } +} + +template class NPar::TTbbLocalExecutor<true>; +template class NPar::TTbbLocalExecutor<false>; diff --git a/library/cpp/threading/local_executor/tbb_local_executor.h b/library/cpp/threading/local_executor/tbb_local_executor.h new file mode 100644 index 00000000000..8d790db18c8 --- /dev/null +++ b/library/cpp/threading/local_executor/tbb_local_executor.h @@ -0,0 +1,49 @@ +#pragma once + +#include "local_executor.h" +#define __TBB_TASK_ISOLATION 1 +#define __TBB_NO_IMPLICIT_LINKAGE 1 + +#include <contrib/libs/tbb/include/tbb/blocked_range.h> +#include <contrib/libs/tbb/include/tbb/parallel_for.h> +#include <contrib/libs/tbb/include/tbb/task_arena.h> +#include <contrib/libs/tbb/include/tbb/task_group.h> + +namespace NPar { + template <bool RespectTls = false> + class TTbbLocalExecutor final: public ILocalExecutor { + public: + TTbbLocalExecutor(int nThreads) + : ILocalExecutor() + , TbbArena(nThreads) + , NumberOfTbbThreads(nThreads) {} + ~TTbbLocalExecutor() noexcept override {} + + // 0-based ILocalExecutor worker thread identification + virtual int GetWorkerThreadId() const noexcept override; + virtual int GetThreadCount() const noexcept override; + + // Add task for further execution. + // + // @param exec Task description. + // @param id Task argument. + // @param flags Bitmask composed by `HIGH_PRIORITY`, `MED_PRIORITY`, `LOW_PRIORITY` + // and `WAIT_COMPLETE`. + virtual void Exec(TIntrusivePtr<ILocallyExecutable> exec, int id, int flags) override; + + // Add tasks range for further execution. + // + // @param exec Task description. + // @param firstId, lastId Task arguments [firstId, lastId) + // @param flags Same as for `Exec`. + virtual void ExecRange(TIntrusivePtr<ILocallyExecutable> exec, int firstId, int lastId, int flags) override; + + // Submit tasks for async run + void SubmitAsyncTasks(TLocallyExecutableFunction exec, int firstId, int lastId); + + private: + mutable tbb::task_arena TbbArena; + tbb::task_group Group; + int NumberOfTbbThreads; + }; +} diff --git a/library/cpp/threading/local_executor/ut/local_executor_ut.cpp b/library/cpp/threading/local_executor/ut/local_executor_ut.cpp new file mode 100644 index 00000000000..ac5737717cd --- /dev/null +++ b/library/cpp/threading/local_executor/ut/local_executor_ut.cpp @@ -0,0 +1,371 @@ +#include <library/cpp/threading/local_executor/local_executor.h> +#include <library/cpp/threading/future/future.h> + +#include <library/cpp/testing/unittest/registar.h> +#include <util/system/mutex.h> +#include <util/system/rwlock.h> +#include <util/generic/algorithm.h> + +using namespace NPar; + +class TTestException: public yexception { +}; + +static const int DefaultThreadsCount = 41; +static const int DefaultRangeSize = 999; + +Y_UNIT_TEST_SUITE(ExecRangeWithFutures){ + bool AllOf(const TVector<int>& vec, int value){ + return AllOf(vec, [value](int element) { return value == element; }); +} + +void AsyncRunAndWaitFuturesReady(int rangeSize, int threads) { + TLocalExecutor localExecutor; + localExecutor.RunAdditionalThreads(threads); + TAtomic signal = 0; + TVector<int> data(rangeSize, 0); + TVector<NThreading::TFuture<void>> futures = localExecutor.ExecRangeWithFutures([&signal, &data](int i) { + UNIT_ASSERT(data[i] == 0); + while (AtomicGet(signal) == 0) + ; + data[i] += 1; + }, + 0, rangeSize, TLocalExecutor::HIGH_PRIORITY); + UNIT_ASSERT(AllOf(data, 0)); + for (auto& future : futures) + UNIT_ASSERT(!future.HasValue()); + AtomicSet(signal, 1); + for (auto& future : futures) { + future.GetValueSync(); + } + UNIT_ASSERT(AllOf(data, 1)); +} + +Y_UNIT_TEST(AsyncRunRangeAndWaitFuturesReady) { + AsyncRunAndWaitFuturesReady(DefaultRangeSize, DefaultThreadsCount); +} + +Y_UNIT_TEST(AsyncRunOneTaskAndWaitFuturesReady) { + AsyncRunAndWaitFuturesReady(1, DefaultThreadsCount); +} + +Y_UNIT_TEST(AsyncRunRangeAndWaitFuturesReadyOneExtraThread) { + AsyncRunAndWaitFuturesReady(DefaultRangeSize, 1); +} + +Y_UNIT_TEST(AsyncRunOneThreadAndWaitFuturesReadyOneExtraThread) { + AsyncRunAndWaitFuturesReady(1, 1); +} + +Y_UNIT_TEST(AsyncRunTwoRangesAndWaitFuturesReady) { + TLocalExecutor localExecutor; + localExecutor.RunAdditionalThreads(DefaultThreadsCount); + TAtomic signal = 0; + TVector<int> data1(DefaultRangeSize, 0); + TVector<NThreading::TFuture<void>> futures1 = localExecutor.ExecRangeWithFutures([&signal, &data1](int i) { + UNIT_ASSERT(data1[i] == 0); + while (AtomicGet(signal) == 0) + ; + data1[i] += 1; + }, + 0, DefaultRangeSize, TLocalExecutor::HIGH_PRIORITY); + TVector<int> data2(DefaultRangeSize, 0); + TVector<NThreading::TFuture<void>> futures2 = localExecutor.ExecRangeWithFutures([&signal, &data2](int i) { + UNIT_ASSERT(data2[i] == 0); + while (AtomicGet(signal) == 0) + ; + data2[i] += 2; + }, + 0, DefaultRangeSize, TLocalExecutor::HIGH_PRIORITY); + UNIT_ASSERT(AllOf(data1, 0)); + UNIT_ASSERT(AllOf(data2, 0)); + AtomicSet(signal, 1); + for (int i = 0; i < DefaultRangeSize; ++i) { + futures1[i].GetValueSync(); + futures2[i].GetValueSync(); + } + UNIT_ASSERT(AllOf(data1, 1)); + UNIT_ASSERT(AllOf(data2, 2)); +} + +void AsyncRunRangeAndWaitExceptions(int rangeSize, int threadsCount) { + TLocalExecutor localExecutor; + localExecutor.RunAdditionalThreads(threadsCount); + TAtomic signal = 0; + TVector<int> data(rangeSize, 0); + TVector<NThreading::TFuture<void>> futures = localExecutor.ExecRangeWithFutures([&signal, &data](int i) { + UNIT_ASSERT(data[i] == 0); + while (AtomicGet(signal) == 0) + ; + data[i] += 1; + throw 10000 + i; + }, + 0, rangeSize, TLocalExecutor::HIGH_PRIORITY); + UNIT_ASSERT(AllOf(data, 0)); + UNIT_ASSERT(futures.ysize() == rangeSize); + AtomicSet(signal, 1); + int exceptionsCaught = 0; + for (int i = 0; i < rangeSize; ++i) { + try { + futures[i].GetValueSync(); + } catch (int& e) { + if (e == 10000 + i) { + ++exceptionsCaught; + } + } + } + UNIT_ASSERT(exceptionsCaught == rangeSize); + UNIT_ASSERT(AllOf(data, 1)); +} + +Y_UNIT_TEST(AsyncRunRangeAndWaitExceptions) { + AsyncRunRangeAndWaitExceptions(DefaultRangeSize, DefaultThreadsCount); +} + +Y_UNIT_TEST(AsyncRunOneTaskAndWaitExceptions) { + AsyncRunRangeAndWaitExceptions(1, DefaultThreadsCount); +} + +Y_UNIT_TEST(AsyncRunRangeAndWaitExceptionsOneExtraThread) { + AsyncRunRangeAndWaitExceptions(DefaultRangeSize, 1); +} + +Y_UNIT_TEST(AsyncRunOneTaskAndWaitExceptionsOneExtraThread) { + AsyncRunRangeAndWaitExceptions(1, 1); +} + +Y_UNIT_TEST(AsyncRunTwoRangesAndWaitExceptions) { + TLocalExecutor localExecutor; + localExecutor.RunAdditionalThreads(DefaultThreadsCount); + TAtomic signal = 0; + TVector<int> data1(DefaultRangeSize, 0); + TVector<NThreading::TFuture<void>> futures1 = localExecutor.ExecRangeWithFutures([&signal, &data1](int i) { + UNIT_ASSERT(data1[i] == 0); + while (AtomicGet(signal) == 0) + ; + data1[i] += 1; + throw 15000 + i; + }, + 0, DefaultRangeSize, TLocalExecutor::LOW_PRIORITY); + TVector<int> data2(DefaultRangeSize, 0); + TVector<NThreading::TFuture<void>> futures2 = localExecutor.ExecRangeWithFutures([&signal, &data2](int i) { + UNIT_ASSERT(data2[i] == 0); + while (AtomicGet(signal) == 0) + ; + data2[i] += 2; + throw 16000 + i; + }, + 0, DefaultRangeSize, TLocalExecutor::HIGH_PRIORITY); + + UNIT_ASSERT(AllOf(data1, 0)); + UNIT_ASSERT(AllOf(data2, 0)); + UNIT_ASSERT(futures1.size() == DefaultRangeSize); + UNIT_ASSERT(futures2.size() == DefaultRangeSize); + AtomicSet(signal, 1); + int exceptionsCaught = 0; + for (int i = 0; i < DefaultRangeSize; ++i) { + try { + futures1[i].GetValueSync(); + } catch (int& e) { + if (e == 15000 + i) { + ++exceptionsCaught; + } + } + try { + futures2[i].GetValueSync(); + } catch (int& e) { + if (e == 16000 + i) { + ++exceptionsCaught; + } + } + } + UNIT_ASSERT(exceptionsCaught == 2 * DefaultRangeSize); + UNIT_ASSERT(AllOf(data1, 1)); + UNIT_ASSERT(AllOf(data2, 2)); +} + +void RunRangeAndCheckExceptionsWithWaitComplete(int rangeSize, int threadsCount) { + TLocalExecutor localExecutor; + localExecutor.RunAdditionalThreads(threadsCount); + TVector<int> data(rangeSize, 0); + TVector<NThreading::TFuture<void>> futures = localExecutor.ExecRangeWithFutures([&data](int i) { + UNIT_ASSERT(data[i] == 0); + data[i] += 1; + throw 30000 + i; + }, + 0, rangeSize, TLocalExecutor::EFlags::WAIT_COMPLETE); + UNIT_ASSERT(AllOf(data, 1)); + int exceptionsCaught = 0; + for (int i = 0; i < rangeSize; ++i) { + try { + futures[i].GetValueSync(); + } catch (int& e) { + if (e == 30000 + i) { + ++exceptionsCaught; + } + } + } + UNIT_ASSERT(exceptionsCaught == rangeSize); + UNIT_ASSERT(AllOf(data, 1)); +} + +Y_UNIT_TEST(RunRangeAndCheckExceptionsWithWaitComplete) { + RunRangeAndCheckExceptionsWithWaitComplete(DefaultRangeSize, DefaultThreadsCount); +} + +Y_UNIT_TEST(RunOneAndCheckExceptionsWithWaitComplete) { + RunRangeAndCheckExceptionsWithWaitComplete(1, DefaultThreadsCount); +} + +Y_UNIT_TEST(RunRangeAndCheckExceptionsWithWaitCompleteOneExtraThread) { + RunRangeAndCheckExceptionsWithWaitComplete(DefaultRangeSize, 1); +} + +Y_UNIT_TEST(RunOneAndCheckExceptionsWithWaitCompleteOneExtraThread) { + RunRangeAndCheckExceptionsWithWaitComplete(1, 1); +} + +Y_UNIT_TEST(RunRangeAndCheckExceptionsWithWaitCompleteZeroExtraThreads) { + RunRangeAndCheckExceptionsWithWaitComplete(DefaultRangeSize, 0); +} + +Y_UNIT_TEST(RunOneAndCheckExceptionsWithWaitCompleteZeroExtraThreads) { + RunRangeAndCheckExceptionsWithWaitComplete(1, 0); +} +} +; + +Y_UNIT_TEST_SUITE(ExecRangeWithThrow){ + void RunParallelWhichThrowsTTestException(int rangeStart, int rangeSize, int threadsCount, int flags, TAtomic& processed){ + AtomicSet(processed, 0); +TLocalExecutor localExecutor; +localExecutor.RunAdditionalThreads(threadsCount); +localExecutor.ExecRangeWithThrow([&processed](int) { + AtomicAdd(processed, 1); + throw TTestException(); +}, + rangeStart, rangeStart + rangeSize, flags); +} + +Y_UNIT_TEST(RunParallelWhichThrowsTTestException) { + TAtomic processed = 0; + UNIT_ASSERT_EXCEPTION( + RunParallelWhichThrowsTTestException(10, 40, DefaultThreadsCount, + TLocalExecutor::EFlags::WAIT_COMPLETE, processed), + TTestException); + UNIT_ASSERT(AtomicGet(processed) == 40); +} + +void ThrowAndCatchTTestException(int rangeSize, int threadsCount, int flags) { + TAtomic processed = 0; + UNIT_ASSERT_EXCEPTION( + RunParallelWhichThrowsTTestException(0, rangeSize, threadsCount, flags, processed), + TTestException); + UNIT_ASSERT(AtomicGet(processed) == rangeSize); +} + +Y_UNIT_TEST(ThrowAndCatchTTestExceptionLowPriority) { + ThrowAndCatchTTestException(DefaultRangeSize, DefaultThreadsCount, + TLocalExecutor::EFlags::WAIT_COMPLETE | TLocalExecutor::EFlags::LOW_PRIORITY); +} + +Y_UNIT_TEST(ThrowAndCatchTTestExceptionMedPriority) { + ThrowAndCatchTTestException(DefaultRangeSize, DefaultThreadsCount, + TLocalExecutor::EFlags::WAIT_COMPLETE | TLocalExecutor::EFlags::MED_PRIORITY); +} + +Y_UNIT_TEST(ThrowAndCatchTTestExceptionHighPriority) { + ThrowAndCatchTTestException(DefaultRangeSize, DefaultThreadsCount, + TLocalExecutor::EFlags::WAIT_COMPLETE | TLocalExecutor::EFlags::HIGH_PRIORITY); +} + +Y_UNIT_TEST(ThrowAndCatchTTestExceptionWaitComplete) { + ThrowAndCatchTTestException(DefaultRangeSize, DefaultThreadsCount, + TLocalExecutor::EFlags::WAIT_COMPLETE); +} + +Y_UNIT_TEST(RethrowExeptionSequentialWaitComplete) { + ThrowAndCatchTTestException(DefaultRangeSize, 0, + TLocalExecutor::EFlags::WAIT_COMPLETE); +} + +Y_UNIT_TEST(RethrowExeptionOneExtraThreadWaitComplete) { + ThrowAndCatchTTestException(DefaultRangeSize, 1, + TLocalExecutor::EFlags::WAIT_COMPLETE); +} + +void ThrowsTTestExceptionFromNested(TLocalExecutor& localExecutor) { + localExecutor.ExecRangeWithThrow([](int) { + throw TTestException(); + }, + 0, 10, TLocalExecutor::EFlags::WAIT_COMPLETE); +} + +void CatchTTestExceptionFromNested(TAtomic& processed1, TAtomic& processed2) { + TLocalExecutor localExecutor; + localExecutor.RunAdditionalThreads(DefaultThreadsCount); + localExecutor.ExecRangeWithThrow([&processed1, &processed2, &localExecutor](int) { + AtomicAdd(processed1, 1); + UNIT_ASSERT_EXCEPTION( + ThrowsTTestExceptionFromNested(localExecutor), + TTestException); + AtomicAdd(processed2, 1); + }, + 0, DefaultRangeSize, TLocalExecutor::EFlags::WAIT_COMPLETE); +} + +Y_UNIT_TEST(NestedParallelExceptionsDoNotLeak) { + TAtomic processed1 = 0; + TAtomic processed2 = 0; + UNIT_ASSERT_NO_EXCEPTION( + CatchTTestExceptionFromNested(processed1, processed2)); + UNIT_ASSERT_EQUAL(AtomicGet(processed1), DefaultRangeSize); + UNIT_ASSERT_EQUAL(AtomicGet(processed2), DefaultRangeSize); +} +} +; + +Y_UNIT_TEST_SUITE(ExecLargeRangeWithThrow){ + + constexpr int LARGE_COUNT = 128 * (1 << 20); + + static auto IsValue(char v) { + return [=](char c) { return c == v; }; + } + + Y_UNIT_TEST(ExecLargeRangeNoExceptions) { + TVector<char> tasks(LARGE_COUNT); + + TLocalExecutor localExecutor; + localExecutor.RunAdditionalThreads(DefaultThreadsCount); + + localExecutor.ExecRangeBlockedWithThrow([&tasks](int i) { + tasks[i] = 1; + }, 0, tasks.size(), 0, TLocalExecutor::EFlags::WAIT_COMPLETE); + UNIT_ASSERT(AllOf(tasks, IsValue(1))); + + + localExecutor.ExecRangeBlockedWithThrow([&tasks](int i) { + tasks[i] += 1; + }, 0, tasks.size(), 128, TLocalExecutor::EFlags::WAIT_COMPLETE); + UNIT_ASSERT(AllOf(tasks, IsValue(2))); + } + + Y_UNIT_TEST(ExecLargeRangeWithException) { + TVector<char> tasks(LARGE_COUNT); + + TLocalExecutor localExecutor; + localExecutor.RunAdditionalThreads(DefaultThreadsCount); + + Fill(tasks.begin(), tasks.end(), 0); + UNIT_ASSERT_EXCEPTION( + localExecutor.ExecRangeBlockedWithThrow([&tasks](int i) { + tasks[i] += 1; + if (i == LARGE_COUNT / 2) { + throw TTestException(); + } + }, 0, tasks.size(), 0, TLocalExecutor::EFlags::WAIT_COMPLETE), + TTestException + ); + } +}; diff --git a/library/cpp/threading/local_executor/ut/ya.make b/library/cpp/threading/local_executor/ut/ya.make new file mode 100644 index 00000000000..be579a5ca06 --- /dev/null +++ b/library/cpp/threading/local_executor/ut/ya.make @@ -0,0 +1,12 @@ +OWNER( + g:matrixnet + gulin +) + +UNITTEST_FOR(library/cpp/threading/local_executor) + +SRCS( + local_executor_ut.cpp +) + +END() diff --git a/library/cpp/threading/local_executor/ya.make b/library/cpp/threading/local_executor/ya.make new file mode 100644 index 00000000000..df210f92bb6 --- /dev/null +++ b/library/cpp/threading/local_executor/ya.make @@ -0,0 +1,20 @@ +OWNER( + g:matrixnet + gulin + kirillovs + espetrov +) + +LIBRARY() + +SRCS( + local_executor.cpp + tbb_local_executor.cpp +) + +PEERDIR( + contrib/libs/tbb + library/cpp/threading/future +) + +END() |