summaryrefslogtreecommitdiffstats
path: root/library/cpp/threading/local_executor
diff options
context:
space:
mode:
authorDevtools Arcadia <[email protected]>2022-02-07 18:08:42 +0300
committerDevtools Arcadia <[email protected]>2022-02-07 18:08:42 +0300
commit1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch)
treee26c9fed0de5d9873cce7e00bc214573dc2195b7 /library/cpp/threading/local_executor
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'library/cpp/threading/local_executor')
-rw-r--r--library/cpp/threading/local_executor/README.md74
-rw-r--r--library/cpp/threading/local_executor/local_executor.cpp369
-rw-r--r--library/cpp/threading/local_executor/local_executor.h294
-rw-r--r--library/cpp/threading/local_executor/tbb_local_executor.cpp53
-rw-r--r--library/cpp/threading/local_executor/tbb_local_executor.h49
-rw-r--r--library/cpp/threading/local_executor/ut/local_executor_ut.cpp371
-rw-r--r--library/cpp/threading/local_executor/ut/ya.make12
-rw-r--r--library/cpp/threading/local_executor/ya.make20
8 files changed, 1242 insertions, 0 deletions
diff --git a/library/cpp/threading/local_executor/README.md b/library/cpp/threading/local_executor/README.md
new file mode 100644
index 00000000000..aaad2e2986c
--- /dev/null
+++ b/library/cpp/threading/local_executor/README.md
@@ -0,0 +1,74 @@
+# Library for parallel task execution in thread pool
+
+This library allows easy parallelization of existing code and cycles.
+It provides `NPar::TLocalExecutor` class and `NPar::LocalExecutor()` singleton accessor.
+At start, `TLocalExecutor` has no threads in thread pool and all async tasks will be queued for later execution when extra threads appear.
+All tasks should be `NPar::ILocallyExecutable` child class or function equal to `std::function<void(int)>`
+
+## TLocalExecutor methods
+
+`TLocalExecutor::Run(int threadcount)` - add threads to thread pool (**WARNING!** `Run(threadcount)` will *add* `threadcount` threads to pool)
+
+`void TLocalExecutor::Exec(TLocallyExecutableFunction exec, int id, int flags)` - run one task and pass id as task function input, flags - bitmask composition of:
+
+- `TLocalExecutor::HIGH_PRIORITY = 0` - put task in high priority queue
+- `TLocalExecutor::MED_PRIORITY = 1` - put task in medium priority queue
+- `TLocalExecutor::LOW_PRIORITY = 2` - put task in low priority queue
+- `TLocalExecutor::WAIT_COMPLETE = 4` - wait for task completion
+
+`void TLocalExecutor::ExecRange(TLocallyExecutableFunction exec, TExecRangeParams blockParams, int flags);` - run range of tasks `[TExecRangeParams::FirstId, TExecRangeParams::LastId).`
+
+`flags` is the same as for `TLocalExecutor::Exec`.
+
+`TExecRangeParams` is a structure that describes the range.
+By default each task is executed separately. Threads from thread pool are taking
+the tasks in the manner first come first serve.
+
+It is also possible to partition range of tasks in consequtive blocks and execute each block as a bigger task.
+`TExecRangeParams::SetBlockCountToThreadCount()` will result in thread count tasks,
+ where thread count is the count of threads in thread pool.
+ each thread will execute approximately equal count of tasks from range.
+
+`TExecRangeParams::SetBlockSize()` and `TExecRangeParams::SetBlockCount()` will partition
+the range of tasks into consequtive blocks of approximately given size, or of size calculated
+ by partitioning the range into approximately equal size blocks of given count.
+
+## Examples
+
+### Simple task async exec with medium priority
+
+```cpp
+using namespace NPar;
+
+LocalExecutor().Run(4);
+TEvent event;
+LocalExecutor().Exec([](int) {
+ SomeFunc();
+ event.Signal();
+}, 0, TLocalExecutor::MED_PRIORITY);
+
+SomeOtherCode();
+event.WaitI();
+```
+
+### Execute task range and wait completion
+
+```cpp
+using namespace NPar;
+
+LocalExecutor().Run(4);
+LocalExecutor().ExecRange([](int id) {
+ SomeFunc(id);
+}, TExecRangeParams(0, 10), TLocalExecutor::WAIT_COMPLETE | TLocalExecutor::MED_PRIORITY);
+```
+
+### Exception handling
+
+By default if a not caught exception arise in a task which runs through the Local Executor, then std::terminate() will be called immediately. The exception will be printed to stderr before the termination. Best practice is to handle exception within a task, or avoid throwing exceptions at all for performance reasons.
+
+However, if you'd like to handle and/or rethrow exceptions outside of a range, you can use ExecRangeWithFuture().
+It returns vector [0 .. LastId-FirstId] elements, where i-th element is a TFuture corresponding to task with id = (FirstId + i).
+Use method .HasValue() of the element to check in Async mode if the corresponding task is complete.
+Use .GetValue() or .GetValueSync() to wait for completion of the corresponding task. GetValue() and GetValueSync() will also rethrow an exception if it appears during execution of the task.
+
+You may also use ExecRangeWithThrow() to just receive an exception from a range if it appears. It rethrows an exception from a task with minimal id if such an exception exists, and guarantees normal flow if no exception arise.
diff --git a/library/cpp/threading/local_executor/local_executor.cpp b/library/cpp/threading/local_executor/local_executor.cpp
new file mode 100644
index 00000000000..1d3fbb4bf44
--- /dev/null
+++ b/library/cpp/threading/local_executor/local_executor.cpp
@@ -0,0 +1,369 @@
+#include "local_executor.h"
+
+#include <library/cpp/threading/future/future.h>
+
+#include <util/generic/utility.h>
+#include <util/system/atomic.h>
+#include <util/system/event.h>
+#include <util/system/thread.h>
+#include <util/system/tls.h>
+#include <util/system/yield.h>
+#include <util/thread/lfqueue.h>
+
+#include <utility>
+
+#ifdef _win_
+static void RegularYield() {
+}
+#else
+// unix actually has cooperative multitasking! :)
+// without this function program runs slower and system lags for some magic reason
+static void RegularYield() {
+ SchedYield();
+}
+#endif
+
+namespace {
+ struct TFunctionWrapper : NPar::ILocallyExecutable {
+ NPar::TLocallyExecutableFunction Exec;
+ TFunctionWrapper(NPar::TLocallyExecutableFunction exec)
+ : Exec(std::move(exec))
+ {
+ }
+ void LocalExec(int id) override {
+ Exec(id);
+ }
+ };
+
+ class TFunctionWrapperWithPromise: public NPar::ILocallyExecutable {
+ private:
+ NPar::TLocallyExecutableFunction Exec;
+ int FirstId, LastId;
+ TVector<NThreading::TPromise<void>> Promises;
+
+ public:
+ TFunctionWrapperWithPromise(NPar::TLocallyExecutableFunction exec, int firstId, int lastId)
+ : Exec(std::move(exec))
+ , FirstId(firstId)
+ , LastId(lastId)
+ {
+ Y_ASSERT(FirstId <= LastId);
+ const int rangeSize = LastId - FirstId;
+ Promises.resize(rangeSize, NThreading::NewPromise());
+ for (auto& promise : Promises) {
+ promise = NThreading::NewPromise();
+ }
+ }
+
+ void LocalExec(int id) override {
+ Y_ASSERT(FirstId <= id && id < LastId);
+ NThreading::NImpl::SetValue(Promises[id - FirstId], [=] { Exec(id); });
+ }
+
+ TVector<NThreading::TFuture<void>> GetFutures() const {
+ TVector<NThreading::TFuture<void>> out;
+ out.reserve(Promises.ysize());
+ for (auto& promise : Promises) {
+ out.push_back(promise.GetFuture());
+ }
+ return out;
+ }
+ };
+
+ struct TSingleJob {
+ TIntrusivePtr<NPar::ILocallyExecutable> Exec;
+ int Id{0};
+
+ TSingleJob() = default;
+ TSingleJob(TIntrusivePtr<NPar::ILocallyExecutable> exec, int id)
+ : Exec(std::move(exec))
+ , Id(id)
+ {
+ }
+ };
+
+ class TLocalRangeExecutor: public NPar::ILocallyExecutable {
+ TIntrusivePtr<NPar::ILocallyExecutable> Exec;
+ alignas(64) TAtomic Counter;
+ alignas(64) TAtomic WorkerCount;
+ int LastId;
+
+ void LocalExec(int) override {
+ AtomicAdd(WorkerCount, 1);
+ for (;;) {
+ if (!DoSingleOp())
+ break;
+ }
+ AtomicAdd(WorkerCount, -1);
+ }
+
+ public:
+ TLocalRangeExecutor(TIntrusivePtr<ILocallyExecutable> exec, int firstId, int lastId)
+ : Exec(std::move(exec))
+ , Counter(firstId)
+ , WorkerCount(0)
+ , LastId(lastId)
+ {
+ }
+ bool DoSingleOp() {
+ const int id = AtomicAdd(Counter, 1) - 1;
+ if (id >= LastId)
+ return false;
+ Exec->LocalExec(id);
+ RegularYield();
+ return true;
+ }
+ void WaitComplete() {
+ while (AtomicGet(WorkerCount) > 0)
+ RegularYield();
+ }
+ int GetRangeSize() const {
+ return Max<int>(LastId - Counter, 0);
+ }
+ };
+
+}
+
+//////////////////////////////////////////////////////////////////////////
+class NPar::TLocalExecutor::TImpl {
+public:
+ TLockFreeQueue<TSingleJob> JobQueue;
+ TLockFreeQueue<TSingleJob> MedJobQueue;
+ TLockFreeQueue<TSingleJob> LowJobQueue;
+ alignas(64) TSystemEvent HasJob;
+
+ TAtomic ThreadCount{0};
+ alignas(64) TAtomic QueueSize{0};
+ TAtomic MPQueueSize{0};
+ TAtomic LPQueueSize{0};
+ TAtomic ThreadId{0};
+
+ Y_THREAD(int)
+ CurrentTaskPriority;
+ Y_THREAD(int)
+ WorkerThreadId;
+
+ static void* HostWorkerThread(void* p);
+ bool GetJob(TSingleJob* job);
+ void RunNewThread();
+ void LaunchRange(TIntrusivePtr<TLocalRangeExecutor> execRange, int queueSizeLimit,
+ TAtomic* queueSize, TLockFreeQueue<TSingleJob>* jobQueue);
+
+ TImpl() = default;
+ ~TImpl();
+};
+
+NPar::TLocalExecutor::TImpl::~TImpl() {
+ AtomicAdd(QueueSize, 1);
+ JobQueue.Enqueue(TSingleJob(nullptr, 0));
+ HasJob.Signal();
+ while (AtomicGet(ThreadCount)) {
+ ThreadYield();
+ }
+}
+
+void* NPar::TLocalExecutor::TImpl::HostWorkerThread(void* p) {
+ static const int FAST_ITERATIONS = 200;
+
+ auto* const ctx = (TImpl*)p;
+ TThread::SetCurrentThreadName("ParLocalExecutor");
+ ctx->WorkerThreadId = AtomicAdd(ctx->ThreadId, 1);
+ for (bool cont = true; cont;) {
+ TSingleJob job;
+ bool gotJob = false;
+ for (int iter = 0; iter < FAST_ITERATIONS; ++iter) {
+ if (ctx->GetJob(&job)) {
+ gotJob = true;
+ break;
+ }
+ }
+ if (!gotJob) {
+ ctx->HasJob.Reset();
+ if (!ctx->GetJob(&job)) {
+ ctx->HasJob.Wait();
+ continue;
+ }
+ }
+ if (job.Exec.Get()) {
+ job.Exec->LocalExec(job.Id);
+ RegularYield();
+ } else {
+ AtomicAdd(ctx->QueueSize, 1);
+ ctx->JobQueue.Enqueue(job);
+ ctx->HasJob.Signal();
+ cont = false;
+ }
+ }
+ AtomicAdd(ctx->ThreadCount, -1);
+ return nullptr;
+}
+
+bool NPar::TLocalExecutor::TImpl::GetJob(TSingleJob* job) {
+ if (JobQueue.Dequeue(job)) {
+ CurrentTaskPriority = TLocalExecutor::HIGH_PRIORITY;
+ AtomicAdd(QueueSize, -1);
+ return true;
+ } else if (MedJobQueue.Dequeue(job)) {
+ CurrentTaskPriority = TLocalExecutor::MED_PRIORITY;
+ AtomicAdd(MPQueueSize, -1);
+ return true;
+ } else if (LowJobQueue.Dequeue(job)) {
+ CurrentTaskPriority = TLocalExecutor::LOW_PRIORITY;
+ AtomicAdd(LPQueueSize, -1);
+ return true;
+ }
+ return false;
+}
+
+void NPar::TLocalExecutor::TImpl::RunNewThread() {
+ AtomicAdd(ThreadCount, 1);
+ TThread thr(HostWorkerThread, this);
+ thr.Start();
+ thr.Detach();
+}
+
+void NPar::TLocalExecutor::TImpl::LaunchRange(TIntrusivePtr<TLocalRangeExecutor> rangeExec,
+ int queueSizeLimit,
+ TAtomic* queueSize,
+ TLockFreeQueue<TSingleJob>* jobQueue) {
+ int count = Min<int>(ThreadCount + 1, rangeExec->GetRangeSize());
+ if (queueSizeLimit >= 0 && AtomicGet(*queueSize) >= queueSizeLimit) {
+ return;
+ }
+ AtomicAdd(*queueSize, count);
+ jobQueue->EnqueueAll(TVector<TSingleJob>{size_t(count), TSingleJob(rangeExec, 0)});
+ HasJob.Signal();
+}
+
+NPar::TLocalExecutor::TLocalExecutor()
+ : Impl_{MakeHolder<TImpl>()} {
+}
+
+NPar::TLocalExecutor::~TLocalExecutor() = default;
+
+void NPar::TLocalExecutor::RunAdditionalThreads(int threadCount) {
+ for (int i = 0; i < threadCount; i++)
+ Impl_->RunNewThread();
+}
+
+void NPar::TLocalExecutor::Exec(TIntrusivePtr<ILocallyExecutable> exec, int id, int flags) {
+ Y_ASSERT((flags & WAIT_COMPLETE) == 0); // unsupported
+ int prior = Max<int>(Impl_->CurrentTaskPriority, flags & PRIORITY_MASK);
+ switch (prior) {
+ case HIGH_PRIORITY:
+ AtomicAdd(Impl_->QueueSize, 1);
+ Impl_->JobQueue.Enqueue(TSingleJob(std::move(exec), id));
+ break;
+ case MED_PRIORITY:
+ AtomicAdd(Impl_->MPQueueSize, 1);
+ Impl_->MedJobQueue.Enqueue(TSingleJob(std::move(exec), id));
+ break;
+ case LOW_PRIORITY:
+ AtomicAdd(Impl_->LPQueueSize, 1);
+ Impl_->LowJobQueue.Enqueue(TSingleJob(std::move(exec), id));
+ break;
+ default:
+ Y_ASSERT(0);
+ break;
+ }
+ Impl_->HasJob.Signal();
+}
+
+void NPar::ILocalExecutor::Exec(TLocallyExecutableFunction exec, int id, int flags) {
+ Exec(new TFunctionWrapper(std::move(exec)), id, flags);
+}
+
+void NPar::TLocalExecutor::ExecRange(TIntrusivePtr<ILocallyExecutable> exec, int firstId, int lastId, int flags) {
+ Y_ASSERT(lastId >= firstId);
+ if (TryExecRangeSequentially([=] (int id) { exec->LocalExec(id); }, firstId, lastId, flags)) {
+ return;
+ }
+ auto rangeExec = MakeIntrusive<TLocalRangeExecutor>(std::move(exec), firstId, lastId);
+ int queueSizeLimit = (flags & WAIT_COMPLETE) ? 10000 : -1;
+ int prior = Max<int>(Impl_->CurrentTaskPriority, flags & PRIORITY_MASK);
+ switch (prior) {
+ case HIGH_PRIORITY:
+ Impl_->LaunchRange(rangeExec, queueSizeLimit, &Impl_->QueueSize, &Impl_->JobQueue);
+ break;
+ case MED_PRIORITY:
+ Impl_->LaunchRange(rangeExec, queueSizeLimit, &Impl_->MPQueueSize, &Impl_->MedJobQueue);
+ break;
+ case LOW_PRIORITY:
+ Impl_->LaunchRange(rangeExec, queueSizeLimit, &Impl_->LPQueueSize, &Impl_->LowJobQueue);
+ break;
+ default:
+ Y_ASSERT(0);
+ break;
+ }
+ if (flags & WAIT_COMPLETE) {
+ int keepPrior = Impl_->CurrentTaskPriority;
+ Impl_->CurrentTaskPriority = prior;
+ while (rangeExec->DoSingleOp()) {
+ }
+ Impl_->CurrentTaskPriority = keepPrior;
+ rangeExec->WaitComplete();
+ }
+}
+
+void NPar::ILocalExecutor::ExecRange(TLocallyExecutableFunction exec, int firstId, int lastId, int flags) {
+ if (TryExecRangeSequentially(exec, firstId, lastId, flags)) {
+ return;
+ }
+ ExecRange(new TFunctionWrapper(exec), firstId, lastId, flags);
+}
+
+void NPar::ILocalExecutor::ExecRangeWithThrow(TLocallyExecutableFunction exec, int firstId, int lastId, int flags) {
+ Y_VERIFY((flags & WAIT_COMPLETE) != 0, "ExecRangeWithThrow() requires WAIT_COMPLETE to wait if exceptions arise.");
+ if (TryExecRangeSequentially(exec, firstId, lastId, flags)) {
+ return;
+ }
+ TVector<NThreading::TFuture<void>> currentRun = ExecRangeWithFutures(exec, firstId, lastId, flags);
+ for (auto& result : currentRun) {
+ result.GetValueSync(); // Exception will be rethrown if exists. If several exception - only the one with minimal id is rethrown.
+ }
+}
+
+TVector<NThreading::TFuture<void>>
+NPar::ILocalExecutor::ExecRangeWithFutures(TLocallyExecutableFunction exec, int firstId, int lastId, int flags) {
+ TFunctionWrapperWithPromise* execWrapper = new TFunctionWrapperWithPromise(exec, firstId, lastId);
+ TVector<NThreading::TFuture<void>> out = execWrapper->GetFutures();
+ ExecRange(execWrapper, firstId, lastId, flags);
+ return out;
+}
+
+void NPar::TLocalExecutor::ClearLPQueue() {
+ for (bool cont = true; cont;) {
+ cont = false;
+ TSingleJob job;
+ while (Impl_->LowJobQueue.Dequeue(&job)) {
+ AtomicAdd(Impl_->LPQueueSize, -1);
+ cont = true;
+ }
+ while (Impl_->MedJobQueue.Dequeue(&job)) {
+ AtomicAdd(Impl_->MPQueueSize, -1);
+ cont = true;
+ }
+ }
+}
+
+int NPar::TLocalExecutor::GetQueueSize() const noexcept {
+ return AtomicGet(Impl_->QueueSize);
+}
+
+int NPar::TLocalExecutor::GetMPQueueSize() const noexcept {
+ return AtomicGet(Impl_->MPQueueSize);
+}
+
+int NPar::TLocalExecutor::GetLPQueueSize() const noexcept {
+ return AtomicGet(Impl_->LPQueueSize);
+}
+
+int NPar::TLocalExecutor::GetWorkerThreadId() const noexcept {
+ return Impl_->WorkerThreadId;
+}
+
+int NPar::TLocalExecutor::GetThreadCount() const noexcept {
+ return AtomicGet(Impl_->ThreadCount);
+}
+
+//////////////////////////////////////////////////////////////////////////
diff --git a/library/cpp/threading/local_executor/local_executor.h b/library/cpp/threading/local_executor/local_executor.h
new file mode 100644
index 00000000000..c1c824f67cb
--- /dev/null
+++ b/library/cpp/threading/local_executor/local_executor.h
@@ -0,0 +1,294 @@
+#pragma once
+
+#include <library/cpp/threading/future/future.h>
+
+#include <util/generic/cast.h>
+#include <util/generic/fwd.h>
+#include <util/generic/noncopyable.h>
+#include <util/generic/ptr.h>
+#include <util/generic/singleton.h>
+#include <util/generic/ymath.h>
+
+#include <functional>
+
+namespace NPar {
+ struct ILocallyExecutable : virtual public TThrRefBase {
+ // Must be implemented by the end user to define job that will be processed by one of
+ // executor threads.
+ //
+ // @param id Job parameter, typically an index pointing somewhere in array, or just
+ // some dummy value, e.g. `0`.
+ virtual void LocalExec(int id) = 0;
+ };
+
+ // Alternative and simpler way of describing a job for executor. Function argument has the
+ // same meaning as `id` in `ILocallyExecutable::LocalExec`.
+ //
+ using TLocallyExecutableFunction = std::function<void(int)>;
+
+ class ILocalExecutor: public TNonCopyable {
+ public:
+ ILocalExecutor() = default;
+ virtual ~ILocalExecutor() = default;
+
+ enum EFlags : int {
+ HIGH_PRIORITY = 0,
+ MED_PRIORITY = 1,
+ LOW_PRIORITY = 2,
+ PRIORITY_MASK = 3,
+ WAIT_COMPLETE = 4
+ };
+
+ // Add task for further execution.
+ //
+ // @param exec Task description.
+ // @param id Task argument.
+ // @param flags Bitmask composed by `HIGH_PRIORITY`, `MED_PRIORITY`, `LOW_PRIORITY`
+ // and `WAIT_COMPLETE`.
+ virtual void Exec(TIntrusivePtr<ILocallyExecutable> exec, int id, int flags) = 0;
+
+ // Add tasks range for further execution.
+ //
+ // @param exec Task description.
+ // @param firstId, lastId Task arguments [firstId, lastId)
+ // @param flags Same as for `Exec`.
+ virtual void ExecRange(TIntrusivePtr<ILocallyExecutable> exec, int firstId, int lastId, int flags) = 0;
+
+ // 0-based ILocalExecutor worker thread identification
+ virtual int GetWorkerThreadId() const noexcept = 0;
+ virtual int GetThreadCount() const noexcept = 0;
+
+ // Describes a range of tasks with parameters from integer range [FirstId, LastId).
+ //
+ class TExecRangeParams {
+ public:
+ template <typename TFirst, typename TLast>
+ TExecRangeParams(TFirst firstId, TLast lastId)
+ : FirstId(SafeIntegerCast<int>(firstId))
+ , LastId(SafeIntegerCast<int>(lastId))
+ {
+ Y_ASSERT(LastId >= FirstId);
+ SetBlockSize(1);
+ }
+ // Partition tasks into `blockCount` blocks of approximately equal size, each of which
+ // will be executed as a separate bigger task.
+ //
+ template <typename TBlockCount>
+ TExecRangeParams& SetBlockCount(TBlockCount blockCount) {
+ Y_ASSERT(SafeIntegerCast<int>(blockCount) > 0 || FirstId == LastId);
+ BlockSize = FirstId == LastId ? 0 : CeilDiv(LastId - FirstId, SafeIntegerCast<int>(blockCount));
+ BlockCount = BlockSize == 0 ? 0 : CeilDiv(LastId - FirstId, BlockSize);
+ BlockEqualToThreads = false;
+ return *this;
+ }
+ // Partition tasks into blocks of approximately `blockSize` size, each of which will
+ // be executed as a separate bigger task.
+ //
+ template <typename TBlockSize>
+ TExecRangeParams& SetBlockSize(TBlockSize blockSize) {
+ Y_ASSERT(SafeIntegerCast<int>(blockSize) > 0 || FirstId == LastId);
+ BlockSize = SafeIntegerCast<int>(blockSize);
+ BlockCount = BlockSize == 0 ? 0 : CeilDiv(LastId - FirstId, BlockSize);
+ BlockEqualToThreads = false;
+ return *this;
+ }
+ // Partition tasks into thread count blocks of approximately equal size, each of which
+ // will be executed as a separate bigger task.
+ //
+ TExecRangeParams& SetBlockCountToThreadCount() {
+ BlockEqualToThreads = true;
+ return *this;
+ }
+ int GetBlockCount() const {
+ Y_ASSERT(!BlockEqualToThreads);
+ return BlockCount;
+ }
+ int GetBlockSize() const {
+ Y_ASSERT(!BlockEqualToThreads);
+ return BlockSize;
+ }
+ bool GetBlockEqualToThreads() {
+ return BlockEqualToThreads;
+ }
+
+ const int FirstId = 0;
+ const int LastId = 0;
+
+ private:
+ int BlockSize;
+ int BlockCount;
+ bool BlockEqualToThreads;
+ };
+
+ // `Exec` and `ExecRange` versions that accept functions.
+ //
+ void Exec(TLocallyExecutableFunction exec, int id, int flags);
+ void ExecRange(TLocallyExecutableFunction exec, int firstId, int lastId, int flags);
+
+ // Version of `ExecRange` that throws exception from task with minimal id if at least one of
+ // task threw an exception.
+ //
+ void ExecRangeWithThrow(TLocallyExecutableFunction exec, int firstId, int lastId, int flags);
+
+ // Version of `ExecRange` that returns vector of futures, thus allowing to retry any task if
+ // it fails.
+ //
+ TVector<NThreading::TFuture<void>> ExecRangeWithFutures(TLocallyExecutableFunction exec, int firstId, int lastId, int flags);
+
+ template <typename TBody>
+ static inline auto BlockedLoopBody(const TExecRangeParams& params, const TBody& body) {
+ return [=](int blockId) {
+ const int blockFirstId = params.FirstId + blockId * params.GetBlockSize();
+ const int blockLastId = Min(params.LastId, blockFirstId + params.GetBlockSize());
+ for (int i = blockFirstId; i < blockLastId; ++i) {
+ body(i);
+ }
+ };
+ }
+
+ template <typename TBody>
+ inline void ExecRange(TBody&& body, TExecRangeParams params, int flags) {
+ if (TryExecRangeSequentially(body, params.FirstId, params.LastId, flags)) {
+ return;
+ }
+ if (params.GetBlockEqualToThreads()) {
+ params.SetBlockCount(GetThreadCount() + ((flags & WAIT_COMPLETE) != 0)); // ThreadCount or ThreadCount+1 depending on WaitFlag
+ }
+ ExecRange(BlockedLoopBody(params, body), 0, params.GetBlockCount(), flags);
+ }
+
+ template <typename TBody>
+ inline void ExecRangeBlockedWithThrow(TBody&& body, int firstId, int lastId, int batchSizeOrZeroForAutoBatchSize, int flags) {
+ if (firstId >= lastId) {
+ return;
+ }
+ const int threadCount = Max(GetThreadCount(), 1);
+ const int batchSize = batchSizeOrZeroForAutoBatchSize
+ ? batchSizeOrZeroForAutoBatchSize
+ : (lastId - firstId + threadCount - 1) / threadCount;
+ const int batchCount = (lastId - firstId + batchSize - 1) / batchSize;
+ const int batchCountPerThread = (batchCount + threadCount - 1) / threadCount;
+ auto states = ExecRangeWithFutures(
+ [=](int threadId) {
+ for (int batchIdPerThread = 0; batchIdPerThread < batchCountPerThread; ++batchIdPerThread) {
+ int batchId = batchIdPerThread * threadCount + threadId;
+ int begin = firstId + batchId * batchSize;
+ int end = Min(begin + batchSize, lastId);
+ for (int i = begin; i < end; ++i) {
+ body(i);
+ }
+ }
+ },
+ 0, threadCount, flags);
+ for (auto& state: states) {
+ state.GetValueSync(); // Re-throw exception if any.
+ }
+ }
+
+ template <typename TBody>
+ static inline bool TryExecRangeSequentially(TBody&& body, int firstId, int lastId, int flags) {
+ if (lastId == firstId) {
+ return true;
+ }
+ if ((flags & WAIT_COMPLETE) && lastId - firstId == 1) {
+ body(firstId);
+ return true;
+ }
+ return false;
+ }
+ };
+
+ // `TLocalExecutor` provides facilities for easy parallelization of existing code and cycles.
+ //
+ // Examples:
+ // Execute one task with medium priority and wait for it completion.
+ // ```
+ // LocalExecutor().Run(4);
+ // TEvent event;
+ // LocalExecutor().Exec([](int) {
+ // SomeFunc();
+ // event.Signal();
+ // }, 0, TLocalExecutor::MED_PRIORITY);
+ //
+ // SomeOtherCode();
+ // event.WaitI();
+ // ```
+ //
+ // Execute range of tasks with medium priority.
+ // ```
+ // LocalExecutor().Run(4);
+ // LocalExecutor().ExecRange([](int id) {
+ // SomeFunc(id);
+ // }, TExecRangeParams(0, 10), TLocalExecutor::WAIT_COMPLETE | TLocalExecutor::MED_PRIORITY);
+ // ```
+ //
+ class TLocalExecutor final: public ILocalExecutor {
+ public:
+ using EFlags = ILocalExecutor::EFlags;
+
+ // Creates executor without threads. You'll need to explicitly call `RunAdditionalThreads`
+ // to add threads to underlying thread pool.
+ //
+ TLocalExecutor();
+ ~TLocalExecutor();
+
+ int GetQueueSize() const noexcept;
+ int GetMPQueueSize() const noexcept;
+ int GetLPQueueSize() const noexcept;
+ void ClearLPQueue();
+
+ // 0-based TLocalExecutor worker thread identification
+ int GetWorkerThreadId() const noexcept override;
+ int GetThreadCount() const noexcept override;
+
+ // **Add** threads to underlying thread pool.
+ //
+ // @param threadCount Number of threads to add.
+ void RunAdditionalThreads(int threadCount);
+
+ // Add task for further execution.
+ //
+ // @param exec Task description.
+ // @param id Task argument.
+ // @param flags Bitmask composed by `HIGH_PRIORITY`, `MED_PRIORITY`, `LOW_PRIORITY`
+ // and `WAIT_COMPLETE`.
+ void Exec(TIntrusivePtr<ILocallyExecutable> exec, int id, int flags) override;
+
+ // Add tasks range for further execution.
+ //
+ // @param exec Task description.
+ // @param firstId, lastId Task arguments [firstId, lastId)
+ // @param flags Same as for `Exec`.
+ void ExecRange(TIntrusivePtr<ILocallyExecutable> exec, int firstId, int lastId, int flags) override;
+
+ using ILocalExecutor::Exec;
+ using ILocalExecutor::ExecRange;
+
+ private:
+ class TImpl;
+ THolder<TImpl> Impl_;
+ };
+
+ static inline TLocalExecutor& LocalExecutor() {
+ return *Singleton<TLocalExecutor>();
+ }
+
+ template <typename TBody>
+ inline void ParallelFor(ILocalExecutor& executor, ui32 from, ui32 to, TBody&& body) {
+ ILocalExecutor::TExecRangeParams params(from, to);
+ params.SetBlockCountToThreadCount();
+ executor.ExecRange(std::forward<TBody>(body), params, TLocalExecutor::WAIT_COMPLETE);
+ }
+
+ template <typename TBody>
+ inline void ParallelFor(ui32 from, ui32 to, TBody&& body) {
+ ParallelFor(LocalExecutor(), from, to, std::forward<TBody>(body));
+ }
+
+ template <typename TBody>
+ inline void AsyncParallelFor(ui32 from, ui32 to, TBody&& body) {
+ ILocalExecutor::TExecRangeParams params(from, to);
+ params.SetBlockCountToThreadCount();
+ LocalExecutor().ExecRange(std::forward<TBody>(body), params, 0);
+ }
+}
diff --git a/library/cpp/threading/local_executor/tbb_local_executor.cpp b/library/cpp/threading/local_executor/tbb_local_executor.cpp
new file mode 100644
index 00000000000..65d66594438
--- /dev/null
+++ b/library/cpp/threading/local_executor/tbb_local_executor.cpp
@@ -0,0 +1,53 @@
+#include "tbb_local_executor.h"
+
+template <bool RespectTls>
+void NPar::TTbbLocalExecutor<RespectTls>::SubmitAsyncTasks(TLocallyExecutableFunction exec, int firstId, int lastId) {
+ for (int i = firstId; i < lastId; ++i) {
+ Group.run([=] { exec(i); });
+ }
+}
+
+template <bool RespectTls>
+int NPar::TTbbLocalExecutor<RespectTls>::GetThreadCount() const noexcept {
+ return NumberOfTbbThreads - 1;
+}
+
+template <bool RespectTls>
+int NPar::TTbbLocalExecutor<RespectTls>::GetWorkerThreadId() const noexcept {
+ return TbbArena.execute([] {
+ return tbb::this_task_arena::current_thread_index();
+ });
+}
+
+template <bool RespectTls>
+void NPar::TTbbLocalExecutor<RespectTls>::Exec(TIntrusivePtr<ILocallyExecutable> exec, int id, int flags) {
+ if (flags & WAIT_COMPLETE) {
+ exec->LocalExec(id);
+ } else {
+ TbbArena.execute([=] {
+ SubmitAsyncTasks([=] (int id) { exec->LocalExec(id); }, id, id + 1);
+ });
+ }
+}
+
+template <bool RespectTls>
+void NPar::TTbbLocalExecutor<RespectTls>::ExecRange(TIntrusivePtr<ILocallyExecutable> exec, int firstId, int lastId, int flags) {
+ if (flags & WAIT_COMPLETE) {
+ TbbArena.execute([=] {
+ if (RespectTls) {
+ tbb::this_task_arena::isolate([=]{
+ tbb::parallel_for(firstId, lastId, [=] (int id) { exec->LocalExec(id); });
+ });
+ } else {
+ tbb::parallel_for(firstId, lastId, [=] (int id) { exec->LocalExec(id); });
+ }
+ });
+ } else {
+ TbbArena.execute([=] {
+ SubmitAsyncTasks([=] (int id) { exec->LocalExec(id); }, firstId, lastId);
+ });
+ }
+}
+
+template class NPar::TTbbLocalExecutor<true>;
+template class NPar::TTbbLocalExecutor<false>;
diff --git a/library/cpp/threading/local_executor/tbb_local_executor.h b/library/cpp/threading/local_executor/tbb_local_executor.h
new file mode 100644
index 00000000000..8d790db18c8
--- /dev/null
+++ b/library/cpp/threading/local_executor/tbb_local_executor.h
@@ -0,0 +1,49 @@
+#pragma once
+
+#include "local_executor.h"
+#define __TBB_TASK_ISOLATION 1
+#define __TBB_NO_IMPLICIT_LINKAGE 1
+
+#include <contrib/libs/tbb/include/tbb/blocked_range.h>
+#include <contrib/libs/tbb/include/tbb/parallel_for.h>
+#include <contrib/libs/tbb/include/tbb/task_arena.h>
+#include <contrib/libs/tbb/include/tbb/task_group.h>
+
+namespace NPar {
+ template <bool RespectTls = false>
+ class TTbbLocalExecutor final: public ILocalExecutor {
+ public:
+ TTbbLocalExecutor(int nThreads)
+ : ILocalExecutor()
+ , TbbArena(nThreads)
+ , NumberOfTbbThreads(nThreads) {}
+ ~TTbbLocalExecutor() noexcept override {}
+
+ // 0-based ILocalExecutor worker thread identification
+ virtual int GetWorkerThreadId() const noexcept override;
+ virtual int GetThreadCount() const noexcept override;
+
+ // Add task for further execution.
+ //
+ // @param exec Task description.
+ // @param id Task argument.
+ // @param flags Bitmask composed by `HIGH_PRIORITY`, `MED_PRIORITY`, `LOW_PRIORITY`
+ // and `WAIT_COMPLETE`.
+ virtual void Exec(TIntrusivePtr<ILocallyExecutable> exec, int id, int flags) override;
+
+ // Add tasks range for further execution.
+ //
+ // @param exec Task description.
+ // @param firstId, lastId Task arguments [firstId, lastId)
+ // @param flags Same as for `Exec`.
+ virtual void ExecRange(TIntrusivePtr<ILocallyExecutable> exec, int firstId, int lastId, int flags) override;
+
+ // Submit tasks for async run
+ void SubmitAsyncTasks(TLocallyExecutableFunction exec, int firstId, int lastId);
+
+ private:
+ mutable tbb::task_arena TbbArena;
+ tbb::task_group Group;
+ int NumberOfTbbThreads;
+ };
+}
diff --git a/library/cpp/threading/local_executor/ut/local_executor_ut.cpp b/library/cpp/threading/local_executor/ut/local_executor_ut.cpp
new file mode 100644
index 00000000000..ac5737717cd
--- /dev/null
+++ b/library/cpp/threading/local_executor/ut/local_executor_ut.cpp
@@ -0,0 +1,371 @@
+#include <library/cpp/threading/local_executor/local_executor.h>
+#include <library/cpp/threading/future/future.h>
+
+#include <library/cpp/testing/unittest/registar.h>
+#include <util/system/mutex.h>
+#include <util/system/rwlock.h>
+#include <util/generic/algorithm.h>
+
+using namespace NPar;
+
+class TTestException: public yexception {
+};
+
+static const int DefaultThreadsCount = 41;
+static const int DefaultRangeSize = 999;
+
+Y_UNIT_TEST_SUITE(ExecRangeWithFutures){
+ bool AllOf(const TVector<int>& vec, int value){
+ return AllOf(vec, [value](int element) { return value == element; });
+}
+
+void AsyncRunAndWaitFuturesReady(int rangeSize, int threads) {
+ TLocalExecutor localExecutor;
+ localExecutor.RunAdditionalThreads(threads);
+ TAtomic signal = 0;
+ TVector<int> data(rangeSize, 0);
+ TVector<NThreading::TFuture<void>> futures = localExecutor.ExecRangeWithFutures([&signal, &data](int i) {
+ UNIT_ASSERT(data[i] == 0);
+ while (AtomicGet(signal) == 0)
+ ;
+ data[i] += 1;
+ },
+ 0, rangeSize, TLocalExecutor::HIGH_PRIORITY);
+ UNIT_ASSERT(AllOf(data, 0));
+ for (auto& future : futures)
+ UNIT_ASSERT(!future.HasValue());
+ AtomicSet(signal, 1);
+ for (auto& future : futures) {
+ future.GetValueSync();
+ }
+ UNIT_ASSERT(AllOf(data, 1));
+}
+
+Y_UNIT_TEST(AsyncRunRangeAndWaitFuturesReady) {
+ AsyncRunAndWaitFuturesReady(DefaultRangeSize, DefaultThreadsCount);
+}
+
+Y_UNIT_TEST(AsyncRunOneTaskAndWaitFuturesReady) {
+ AsyncRunAndWaitFuturesReady(1, DefaultThreadsCount);
+}
+
+Y_UNIT_TEST(AsyncRunRangeAndWaitFuturesReadyOneExtraThread) {
+ AsyncRunAndWaitFuturesReady(DefaultRangeSize, 1);
+}
+
+Y_UNIT_TEST(AsyncRunOneThreadAndWaitFuturesReadyOneExtraThread) {
+ AsyncRunAndWaitFuturesReady(1, 1);
+}
+
+Y_UNIT_TEST(AsyncRunTwoRangesAndWaitFuturesReady) {
+ TLocalExecutor localExecutor;
+ localExecutor.RunAdditionalThreads(DefaultThreadsCount);
+ TAtomic signal = 0;
+ TVector<int> data1(DefaultRangeSize, 0);
+ TVector<NThreading::TFuture<void>> futures1 = localExecutor.ExecRangeWithFutures([&signal, &data1](int i) {
+ UNIT_ASSERT(data1[i] == 0);
+ while (AtomicGet(signal) == 0)
+ ;
+ data1[i] += 1;
+ },
+ 0, DefaultRangeSize, TLocalExecutor::HIGH_PRIORITY);
+ TVector<int> data2(DefaultRangeSize, 0);
+ TVector<NThreading::TFuture<void>> futures2 = localExecutor.ExecRangeWithFutures([&signal, &data2](int i) {
+ UNIT_ASSERT(data2[i] == 0);
+ while (AtomicGet(signal) == 0)
+ ;
+ data2[i] += 2;
+ },
+ 0, DefaultRangeSize, TLocalExecutor::HIGH_PRIORITY);
+ UNIT_ASSERT(AllOf(data1, 0));
+ UNIT_ASSERT(AllOf(data2, 0));
+ AtomicSet(signal, 1);
+ for (int i = 0; i < DefaultRangeSize; ++i) {
+ futures1[i].GetValueSync();
+ futures2[i].GetValueSync();
+ }
+ UNIT_ASSERT(AllOf(data1, 1));
+ UNIT_ASSERT(AllOf(data2, 2));
+}
+
+void AsyncRunRangeAndWaitExceptions(int rangeSize, int threadsCount) {
+ TLocalExecutor localExecutor;
+ localExecutor.RunAdditionalThreads(threadsCount);
+ TAtomic signal = 0;
+ TVector<int> data(rangeSize, 0);
+ TVector<NThreading::TFuture<void>> futures = localExecutor.ExecRangeWithFutures([&signal, &data](int i) {
+ UNIT_ASSERT(data[i] == 0);
+ while (AtomicGet(signal) == 0)
+ ;
+ data[i] += 1;
+ throw 10000 + i;
+ },
+ 0, rangeSize, TLocalExecutor::HIGH_PRIORITY);
+ UNIT_ASSERT(AllOf(data, 0));
+ UNIT_ASSERT(futures.ysize() == rangeSize);
+ AtomicSet(signal, 1);
+ int exceptionsCaught = 0;
+ for (int i = 0; i < rangeSize; ++i) {
+ try {
+ futures[i].GetValueSync();
+ } catch (int& e) {
+ if (e == 10000 + i) {
+ ++exceptionsCaught;
+ }
+ }
+ }
+ UNIT_ASSERT(exceptionsCaught == rangeSize);
+ UNIT_ASSERT(AllOf(data, 1));
+}
+
+Y_UNIT_TEST(AsyncRunRangeAndWaitExceptions) {
+ AsyncRunRangeAndWaitExceptions(DefaultRangeSize, DefaultThreadsCount);
+}
+
+Y_UNIT_TEST(AsyncRunOneTaskAndWaitExceptions) {
+ AsyncRunRangeAndWaitExceptions(1, DefaultThreadsCount);
+}
+
+Y_UNIT_TEST(AsyncRunRangeAndWaitExceptionsOneExtraThread) {
+ AsyncRunRangeAndWaitExceptions(DefaultRangeSize, 1);
+}
+
+Y_UNIT_TEST(AsyncRunOneTaskAndWaitExceptionsOneExtraThread) {
+ AsyncRunRangeAndWaitExceptions(1, 1);
+}
+
+Y_UNIT_TEST(AsyncRunTwoRangesAndWaitExceptions) {
+ TLocalExecutor localExecutor;
+ localExecutor.RunAdditionalThreads(DefaultThreadsCount);
+ TAtomic signal = 0;
+ TVector<int> data1(DefaultRangeSize, 0);
+ TVector<NThreading::TFuture<void>> futures1 = localExecutor.ExecRangeWithFutures([&signal, &data1](int i) {
+ UNIT_ASSERT(data1[i] == 0);
+ while (AtomicGet(signal) == 0)
+ ;
+ data1[i] += 1;
+ throw 15000 + i;
+ },
+ 0, DefaultRangeSize, TLocalExecutor::LOW_PRIORITY);
+ TVector<int> data2(DefaultRangeSize, 0);
+ TVector<NThreading::TFuture<void>> futures2 = localExecutor.ExecRangeWithFutures([&signal, &data2](int i) {
+ UNIT_ASSERT(data2[i] == 0);
+ while (AtomicGet(signal) == 0)
+ ;
+ data2[i] += 2;
+ throw 16000 + i;
+ },
+ 0, DefaultRangeSize, TLocalExecutor::HIGH_PRIORITY);
+
+ UNIT_ASSERT(AllOf(data1, 0));
+ UNIT_ASSERT(AllOf(data2, 0));
+ UNIT_ASSERT(futures1.size() == DefaultRangeSize);
+ UNIT_ASSERT(futures2.size() == DefaultRangeSize);
+ AtomicSet(signal, 1);
+ int exceptionsCaught = 0;
+ for (int i = 0; i < DefaultRangeSize; ++i) {
+ try {
+ futures1[i].GetValueSync();
+ } catch (int& e) {
+ if (e == 15000 + i) {
+ ++exceptionsCaught;
+ }
+ }
+ try {
+ futures2[i].GetValueSync();
+ } catch (int& e) {
+ if (e == 16000 + i) {
+ ++exceptionsCaught;
+ }
+ }
+ }
+ UNIT_ASSERT(exceptionsCaught == 2 * DefaultRangeSize);
+ UNIT_ASSERT(AllOf(data1, 1));
+ UNIT_ASSERT(AllOf(data2, 2));
+}
+
+void RunRangeAndCheckExceptionsWithWaitComplete(int rangeSize, int threadsCount) {
+ TLocalExecutor localExecutor;
+ localExecutor.RunAdditionalThreads(threadsCount);
+ TVector<int> data(rangeSize, 0);
+ TVector<NThreading::TFuture<void>> futures = localExecutor.ExecRangeWithFutures([&data](int i) {
+ UNIT_ASSERT(data[i] == 0);
+ data[i] += 1;
+ throw 30000 + i;
+ },
+ 0, rangeSize, TLocalExecutor::EFlags::WAIT_COMPLETE);
+ UNIT_ASSERT(AllOf(data, 1));
+ int exceptionsCaught = 0;
+ for (int i = 0; i < rangeSize; ++i) {
+ try {
+ futures[i].GetValueSync();
+ } catch (int& e) {
+ if (e == 30000 + i) {
+ ++exceptionsCaught;
+ }
+ }
+ }
+ UNIT_ASSERT(exceptionsCaught == rangeSize);
+ UNIT_ASSERT(AllOf(data, 1));
+}
+
+Y_UNIT_TEST(RunRangeAndCheckExceptionsWithWaitComplete) {
+ RunRangeAndCheckExceptionsWithWaitComplete(DefaultRangeSize, DefaultThreadsCount);
+}
+
+Y_UNIT_TEST(RunOneAndCheckExceptionsWithWaitComplete) {
+ RunRangeAndCheckExceptionsWithWaitComplete(1, DefaultThreadsCount);
+}
+
+Y_UNIT_TEST(RunRangeAndCheckExceptionsWithWaitCompleteOneExtraThread) {
+ RunRangeAndCheckExceptionsWithWaitComplete(DefaultRangeSize, 1);
+}
+
+Y_UNIT_TEST(RunOneAndCheckExceptionsWithWaitCompleteOneExtraThread) {
+ RunRangeAndCheckExceptionsWithWaitComplete(1, 1);
+}
+
+Y_UNIT_TEST(RunRangeAndCheckExceptionsWithWaitCompleteZeroExtraThreads) {
+ RunRangeAndCheckExceptionsWithWaitComplete(DefaultRangeSize, 0);
+}
+
+Y_UNIT_TEST(RunOneAndCheckExceptionsWithWaitCompleteZeroExtraThreads) {
+ RunRangeAndCheckExceptionsWithWaitComplete(1, 0);
+}
+}
+;
+
+Y_UNIT_TEST_SUITE(ExecRangeWithThrow){
+ void RunParallelWhichThrowsTTestException(int rangeStart, int rangeSize, int threadsCount, int flags, TAtomic& processed){
+ AtomicSet(processed, 0);
+TLocalExecutor localExecutor;
+localExecutor.RunAdditionalThreads(threadsCount);
+localExecutor.ExecRangeWithThrow([&processed](int) {
+ AtomicAdd(processed, 1);
+ throw TTestException();
+},
+ rangeStart, rangeStart + rangeSize, flags);
+}
+
+Y_UNIT_TEST(RunParallelWhichThrowsTTestException) {
+ TAtomic processed = 0;
+ UNIT_ASSERT_EXCEPTION(
+ RunParallelWhichThrowsTTestException(10, 40, DefaultThreadsCount,
+ TLocalExecutor::EFlags::WAIT_COMPLETE, processed),
+ TTestException);
+ UNIT_ASSERT(AtomicGet(processed) == 40);
+}
+
+void ThrowAndCatchTTestException(int rangeSize, int threadsCount, int flags) {
+ TAtomic processed = 0;
+ UNIT_ASSERT_EXCEPTION(
+ RunParallelWhichThrowsTTestException(0, rangeSize, threadsCount, flags, processed),
+ TTestException);
+ UNIT_ASSERT(AtomicGet(processed) == rangeSize);
+}
+
+Y_UNIT_TEST(ThrowAndCatchTTestExceptionLowPriority) {
+ ThrowAndCatchTTestException(DefaultRangeSize, DefaultThreadsCount,
+ TLocalExecutor::EFlags::WAIT_COMPLETE | TLocalExecutor::EFlags::LOW_PRIORITY);
+}
+
+Y_UNIT_TEST(ThrowAndCatchTTestExceptionMedPriority) {
+ ThrowAndCatchTTestException(DefaultRangeSize, DefaultThreadsCount,
+ TLocalExecutor::EFlags::WAIT_COMPLETE | TLocalExecutor::EFlags::MED_PRIORITY);
+}
+
+Y_UNIT_TEST(ThrowAndCatchTTestExceptionHighPriority) {
+ ThrowAndCatchTTestException(DefaultRangeSize, DefaultThreadsCount,
+ TLocalExecutor::EFlags::WAIT_COMPLETE | TLocalExecutor::EFlags::HIGH_PRIORITY);
+}
+
+Y_UNIT_TEST(ThrowAndCatchTTestExceptionWaitComplete) {
+ ThrowAndCatchTTestException(DefaultRangeSize, DefaultThreadsCount,
+ TLocalExecutor::EFlags::WAIT_COMPLETE);
+}
+
+Y_UNIT_TEST(RethrowExeptionSequentialWaitComplete) {
+ ThrowAndCatchTTestException(DefaultRangeSize, 0,
+ TLocalExecutor::EFlags::WAIT_COMPLETE);
+}
+
+Y_UNIT_TEST(RethrowExeptionOneExtraThreadWaitComplete) {
+ ThrowAndCatchTTestException(DefaultRangeSize, 1,
+ TLocalExecutor::EFlags::WAIT_COMPLETE);
+}
+
+void ThrowsTTestExceptionFromNested(TLocalExecutor& localExecutor) {
+ localExecutor.ExecRangeWithThrow([](int) {
+ throw TTestException();
+ },
+ 0, 10, TLocalExecutor::EFlags::WAIT_COMPLETE);
+}
+
+void CatchTTestExceptionFromNested(TAtomic& processed1, TAtomic& processed2) {
+ TLocalExecutor localExecutor;
+ localExecutor.RunAdditionalThreads(DefaultThreadsCount);
+ localExecutor.ExecRangeWithThrow([&processed1, &processed2, &localExecutor](int) {
+ AtomicAdd(processed1, 1);
+ UNIT_ASSERT_EXCEPTION(
+ ThrowsTTestExceptionFromNested(localExecutor),
+ TTestException);
+ AtomicAdd(processed2, 1);
+ },
+ 0, DefaultRangeSize, TLocalExecutor::EFlags::WAIT_COMPLETE);
+}
+
+Y_UNIT_TEST(NestedParallelExceptionsDoNotLeak) {
+ TAtomic processed1 = 0;
+ TAtomic processed2 = 0;
+ UNIT_ASSERT_NO_EXCEPTION(
+ CatchTTestExceptionFromNested(processed1, processed2));
+ UNIT_ASSERT_EQUAL(AtomicGet(processed1), DefaultRangeSize);
+ UNIT_ASSERT_EQUAL(AtomicGet(processed2), DefaultRangeSize);
+}
+}
+;
+
+Y_UNIT_TEST_SUITE(ExecLargeRangeWithThrow){
+
+ constexpr int LARGE_COUNT = 128 * (1 << 20);
+
+ static auto IsValue(char v) {
+ return [=](char c) { return c == v; };
+ }
+
+ Y_UNIT_TEST(ExecLargeRangeNoExceptions) {
+ TVector<char> tasks(LARGE_COUNT);
+
+ TLocalExecutor localExecutor;
+ localExecutor.RunAdditionalThreads(DefaultThreadsCount);
+
+ localExecutor.ExecRangeBlockedWithThrow([&tasks](int i) {
+ tasks[i] = 1;
+ }, 0, tasks.size(), 0, TLocalExecutor::EFlags::WAIT_COMPLETE);
+ UNIT_ASSERT(AllOf(tasks, IsValue(1)));
+
+
+ localExecutor.ExecRangeBlockedWithThrow([&tasks](int i) {
+ tasks[i] += 1;
+ }, 0, tasks.size(), 128, TLocalExecutor::EFlags::WAIT_COMPLETE);
+ UNIT_ASSERT(AllOf(tasks, IsValue(2)));
+ }
+
+ Y_UNIT_TEST(ExecLargeRangeWithException) {
+ TVector<char> tasks(LARGE_COUNT);
+
+ TLocalExecutor localExecutor;
+ localExecutor.RunAdditionalThreads(DefaultThreadsCount);
+
+ Fill(tasks.begin(), tasks.end(), 0);
+ UNIT_ASSERT_EXCEPTION(
+ localExecutor.ExecRangeBlockedWithThrow([&tasks](int i) {
+ tasks[i] += 1;
+ if (i == LARGE_COUNT / 2) {
+ throw TTestException();
+ }
+ }, 0, tasks.size(), 0, TLocalExecutor::EFlags::WAIT_COMPLETE),
+ TTestException
+ );
+ }
+};
diff --git a/library/cpp/threading/local_executor/ut/ya.make b/library/cpp/threading/local_executor/ut/ya.make
new file mode 100644
index 00000000000..be579a5ca06
--- /dev/null
+++ b/library/cpp/threading/local_executor/ut/ya.make
@@ -0,0 +1,12 @@
+OWNER(
+ g:matrixnet
+ gulin
+)
+
+UNITTEST_FOR(library/cpp/threading/local_executor)
+
+SRCS(
+ local_executor_ut.cpp
+)
+
+END()
diff --git a/library/cpp/threading/local_executor/ya.make b/library/cpp/threading/local_executor/ya.make
new file mode 100644
index 00000000000..df210f92bb6
--- /dev/null
+++ b/library/cpp/threading/local_executor/ya.make
@@ -0,0 +1,20 @@
+OWNER(
+ g:matrixnet
+ gulin
+ kirillovs
+ espetrov
+)
+
+LIBRARY()
+
+SRCS(
+ local_executor.cpp
+ tbb_local_executor.cpp
+)
+
+PEERDIR(
+ contrib/libs/tbb
+ library/cpp/threading/future
+)
+
+END()