diff options
author | yazevnul <yazevnul@yandex-team.ru> | 2022-02-10 16:46:46 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:46:46 +0300 |
commit | 8cbc307de0221f84c80c42dcbe07d40727537e2c (patch) | |
tree | 625d5a673015d1df891e051033e9fcde5c7be4e5 /library/cpp/threading/local_executor | |
parent | 30d1ef3941e0dc835be7609de5ebee66958f215a (diff) | |
download | ydb-8cbc307de0221f84c80c42dcbe07d40727537e2c.tar.gz |
Restoring authorship annotation for <yazevnul@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/threading/local_executor')
5 files changed, 363 insertions, 363 deletions
diff --git a/library/cpp/threading/local_executor/local_executor.cpp b/library/cpp/threading/local_executor/local_executor.cpp index 1d3fbb4bf4..6e62d09d85 100644 --- a/library/cpp/threading/local_executor/local_executor.cpp +++ b/library/cpp/threading/local_executor/local_executor.cpp @@ -1,17 +1,17 @@ #include "local_executor.h" #include <library/cpp/threading/future/future.h> - -#include <util/generic/utility.h> -#include <util/system/atomic.h> -#include <util/system/event.h> + +#include <util/generic/utility.h> +#include <util/system/atomic.h> +#include <util/system/event.h> #include <util/system/thread.h> -#include <util/system/tls.h> +#include <util/system/tls.h> #include <util/system/yield.h> -#include <util/thread/lfqueue.h> - -#include <utility> +#include <util/thread/lfqueue.h> +#include <utility> + #ifdef _win_ static void RegularYield() { } @@ -23,11 +23,11 @@ static void RegularYield() { } #endif -namespace { - struct TFunctionWrapper : NPar::ILocallyExecutable { - NPar::TLocallyExecutableFunction Exec; - TFunctionWrapper(NPar::TLocallyExecutableFunction exec) - : Exec(std::move(exec)) +namespace { + struct TFunctionWrapper : NPar::ILocallyExecutable { + NPar::TLocallyExecutableFunction Exec; + TFunctionWrapper(NPar::TLocallyExecutableFunction exec) + : Exec(std::move(exec)) { } void LocalExec(int id) override { @@ -35,15 +35,15 @@ namespace { } }; - class TFunctionWrapperWithPromise: public NPar::ILocallyExecutable { + class TFunctionWrapperWithPromise: public NPar::ILocallyExecutable { private: - NPar::TLocallyExecutableFunction Exec; + NPar::TLocallyExecutableFunction Exec; int FirstId, LastId; TVector<NThreading::TPromise<void>> Promises; public: - TFunctionWrapperWithPromise(NPar::TLocallyExecutableFunction exec, int firstId, int lastId) - : Exec(std::move(exec)) + TFunctionWrapperWithPromise(NPar::TLocallyExecutableFunction exec, int firstId, int lastId) + : Exec(std::move(exec)) , FirstId(firstId) , LastId(lastId) { @@ -70,300 +70,300 @@ namespace { } }; - struct TSingleJob { - TIntrusivePtr<NPar::ILocallyExecutable> Exec; - int Id{0}; + struct TSingleJob { + TIntrusivePtr<NPar::ILocallyExecutable> Exec; + int Id{0}; - TSingleJob() = default; - TSingleJob(TIntrusivePtr<NPar::ILocallyExecutable> exec, int id) - : Exec(std::move(exec)) - , Id(id) - { + TSingleJob() = default; + TSingleJob(TIntrusivePtr<NPar::ILocallyExecutable> exec, int id) + : Exec(std::move(exec)) + , Id(id) + { } - }; + }; - class TLocalRangeExecutor: public NPar::ILocallyExecutable { - TIntrusivePtr<NPar::ILocallyExecutable> Exec; + class TLocalRangeExecutor: public NPar::ILocallyExecutable { + TIntrusivePtr<NPar::ILocallyExecutable> Exec; alignas(64) TAtomic Counter; alignas(64) TAtomic WorkerCount; - int LastId; - - void LocalExec(int) override { - AtomicAdd(WorkerCount, 1); - for (;;) { - if (!DoSingleOp()) - break; - } - AtomicAdd(WorkerCount, -1); + int LastId; + + void LocalExec(int) override { + AtomicAdd(WorkerCount, 1); + for (;;) { + if (!DoSingleOp()) + break; + } + AtomicAdd(WorkerCount, -1); } - public: - TLocalRangeExecutor(TIntrusivePtr<ILocallyExecutable> exec, int firstId, int lastId) - : Exec(std::move(exec)) - , Counter(firstId) - , WorkerCount(0) - , LastId(lastId) - { + public: + TLocalRangeExecutor(TIntrusivePtr<ILocallyExecutable> exec, int firstId, int lastId) + : Exec(std::move(exec)) + , Counter(firstId) + , WorkerCount(0) + , LastId(lastId) + { } - bool DoSingleOp() { + bool DoSingleOp() { const int id = AtomicAdd(Counter, 1) - 1; - if (id >= LastId) - return false; - Exec->LocalExec(id); - RegularYield(); - return true; + if (id >= LastId) + return false; + Exec->LocalExec(id); + RegularYield(); + return true; } - void WaitComplete() { - while (AtomicGet(WorkerCount) > 0) - RegularYield(); + void WaitComplete() { + while (AtomicGet(WorkerCount) > 0) + RegularYield(); } - int GetRangeSize() const { - return Max<int>(LastId - Counter, 0); + int GetRangeSize() const { + return Max<int>(LastId - Counter, 0); } - }; + }; } - -////////////////////////////////////////////////////////////////////////// -class NPar::TLocalExecutor::TImpl { -public: - TLockFreeQueue<TSingleJob> JobQueue; - TLockFreeQueue<TSingleJob> MedJobQueue; - TLockFreeQueue<TSingleJob> LowJobQueue; + +////////////////////////////////////////////////////////////////////////// +class NPar::TLocalExecutor::TImpl { +public: + TLockFreeQueue<TSingleJob> JobQueue; + TLockFreeQueue<TSingleJob> MedJobQueue; + TLockFreeQueue<TSingleJob> LowJobQueue; alignas(64) TSystemEvent HasJob; - - TAtomic ThreadCount{0}; + + TAtomic ThreadCount{0}; alignas(64) TAtomic QueueSize{0}; - TAtomic MPQueueSize{0}; - TAtomic LPQueueSize{0}; - TAtomic ThreadId{0}; - - Y_THREAD(int) - CurrentTaskPriority; - Y_THREAD(int) - WorkerThreadId; - - static void* HostWorkerThread(void* p); - bool GetJob(TSingleJob* job); - void RunNewThread(); - void LaunchRange(TIntrusivePtr<TLocalRangeExecutor> execRange, int queueSizeLimit, - TAtomic* queueSize, TLockFreeQueue<TSingleJob>* jobQueue); - - TImpl() = default; - ~TImpl(); -}; - -NPar::TLocalExecutor::TImpl::~TImpl() { - AtomicAdd(QueueSize, 1); - JobQueue.Enqueue(TSingleJob(nullptr, 0)); - HasJob.Signal(); - while (AtomicGet(ThreadCount)) { - ThreadYield(); - } -} - -void* NPar::TLocalExecutor::TImpl::HostWorkerThread(void* p) { - static const int FAST_ITERATIONS = 200; - - auto* const ctx = (TImpl*)p; + TAtomic MPQueueSize{0}; + TAtomic LPQueueSize{0}; + TAtomic ThreadId{0}; + + Y_THREAD(int) + CurrentTaskPriority; + Y_THREAD(int) + WorkerThreadId; + + static void* HostWorkerThread(void* p); + bool GetJob(TSingleJob* job); + void RunNewThread(); + void LaunchRange(TIntrusivePtr<TLocalRangeExecutor> execRange, int queueSizeLimit, + TAtomic* queueSize, TLockFreeQueue<TSingleJob>* jobQueue); + + TImpl() = default; + ~TImpl(); +}; + +NPar::TLocalExecutor::TImpl::~TImpl() { + AtomicAdd(QueueSize, 1); + JobQueue.Enqueue(TSingleJob(nullptr, 0)); + HasJob.Signal(); + while (AtomicGet(ThreadCount)) { + ThreadYield(); + } +} + +void* NPar::TLocalExecutor::TImpl::HostWorkerThread(void* p) { + static const int FAST_ITERATIONS = 200; + + auto* const ctx = (TImpl*)p; TThread::SetCurrentThreadName("ParLocalExecutor"); - ctx->WorkerThreadId = AtomicAdd(ctx->ThreadId, 1); - for (bool cont = true; cont;) { - TSingleJob job; - bool gotJob = false; - for (int iter = 0; iter < FAST_ITERATIONS; ++iter) { - if (ctx->GetJob(&job)) { - gotJob = true; - break; - } - } - if (!gotJob) { - ctx->HasJob.Reset(); - if (!ctx->GetJob(&job)) { - ctx->HasJob.Wait(); - continue; - } - } - if (job.Exec.Get()) { - job.Exec->LocalExec(job.Id); - RegularYield(); - } else { - AtomicAdd(ctx->QueueSize, 1); - ctx->JobQueue.Enqueue(job); - ctx->HasJob.Signal(); - cont = false; - } - } - AtomicAdd(ctx->ThreadCount, -1); - return nullptr; -} - -bool NPar::TLocalExecutor::TImpl::GetJob(TSingleJob* job) { - if (JobQueue.Dequeue(job)) { - CurrentTaskPriority = TLocalExecutor::HIGH_PRIORITY; - AtomicAdd(QueueSize, -1); - return true; - } else if (MedJobQueue.Dequeue(job)) { - CurrentTaskPriority = TLocalExecutor::MED_PRIORITY; - AtomicAdd(MPQueueSize, -1); - return true; - } else if (LowJobQueue.Dequeue(job)) { - CurrentTaskPriority = TLocalExecutor::LOW_PRIORITY; - AtomicAdd(LPQueueSize, -1); - return true; - } - return false; -} - -void NPar::TLocalExecutor::TImpl::RunNewThread() { - AtomicAdd(ThreadCount, 1); - TThread thr(HostWorkerThread, this); - thr.Start(); - thr.Detach(); -} - -void NPar::TLocalExecutor::TImpl::LaunchRange(TIntrusivePtr<TLocalRangeExecutor> rangeExec, - int queueSizeLimit, - TAtomic* queueSize, - TLockFreeQueue<TSingleJob>* jobQueue) { - int count = Min<int>(ThreadCount + 1, rangeExec->GetRangeSize()); - if (queueSizeLimit >= 0 && AtomicGet(*queueSize) >= queueSizeLimit) { - return; - } - AtomicAdd(*queueSize, count); + ctx->WorkerThreadId = AtomicAdd(ctx->ThreadId, 1); + for (bool cont = true; cont;) { + TSingleJob job; + bool gotJob = false; + for (int iter = 0; iter < FAST_ITERATIONS; ++iter) { + if (ctx->GetJob(&job)) { + gotJob = true; + break; + } + } + if (!gotJob) { + ctx->HasJob.Reset(); + if (!ctx->GetJob(&job)) { + ctx->HasJob.Wait(); + continue; + } + } + if (job.Exec.Get()) { + job.Exec->LocalExec(job.Id); + RegularYield(); + } else { + AtomicAdd(ctx->QueueSize, 1); + ctx->JobQueue.Enqueue(job); + ctx->HasJob.Signal(); + cont = false; + } + } + AtomicAdd(ctx->ThreadCount, -1); + return nullptr; +} + +bool NPar::TLocalExecutor::TImpl::GetJob(TSingleJob* job) { + if (JobQueue.Dequeue(job)) { + CurrentTaskPriority = TLocalExecutor::HIGH_PRIORITY; + AtomicAdd(QueueSize, -1); + return true; + } else if (MedJobQueue.Dequeue(job)) { + CurrentTaskPriority = TLocalExecutor::MED_PRIORITY; + AtomicAdd(MPQueueSize, -1); + return true; + } else if (LowJobQueue.Dequeue(job)) { + CurrentTaskPriority = TLocalExecutor::LOW_PRIORITY; + AtomicAdd(LPQueueSize, -1); + return true; + } + return false; +} + +void NPar::TLocalExecutor::TImpl::RunNewThread() { + AtomicAdd(ThreadCount, 1); + TThread thr(HostWorkerThread, this); + thr.Start(); + thr.Detach(); +} + +void NPar::TLocalExecutor::TImpl::LaunchRange(TIntrusivePtr<TLocalRangeExecutor> rangeExec, + int queueSizeLimit, + TAtomic* queueSize, + TLockFreeQueue<TSingleJob>* jobQueue) { + int count = Min<int>(ThreadCount + 1, rangeExec->GetRangeSize()); + if (queueSizeLimit >= 0 && AtomicGet(*queueSize) >= queueSizeLimit) { + return; + } + AtomicAdd(*queueSize, count); jobQueue->EnqueueAll(TVector<TSingleJob>{size_t(count), TSingleJob(rangeExec, 0)}); - HasJob.Signal(); -} - -NPar::TLocalExecutor::TLocalExecutor() - : Impl_{MakeHolder<TImpl>()} { -} - -NPar::TLocalExecutor::~TLocalExecutor() = default; - -void NPar::TLocalExecutor::RunAdditionalThreads(int threadCount) { - for (int i = 0; i < threadCount; i++) - Impl_->RunNewThread(); -} - -void NPar::TLocalExecutor::Exec(TIntrusivePtr<ILocallyExecutable> exec, int id, int flags) { - Y_ASSERT((flags & WAIT_COMPLETE) == 0); // unsupported - int prior = Max<int>(Impl_->CurrentTaskPriority, flags & PRIORITY_MASK); - switch (prior) { - case HIGH_PRIORITY: - AtomicAdd(Impl_->QueueSize, 1); - Impl_->JobQueue.Enqueue(TSingleJob(std::move(exec), id)); - break; - case MED_PRIORITY: - AtomicAdd(Impl_->MPQueueSize, 1); - Impl_->MedJobQueue.Enqueue(TSingleJob(std::move(exec), id)); - break; - case LOW_PRIORITY: - AtomicAdd(Impl_->LPQueueSize, 1); - Impl_->LowJobQueue.Enqueue(TSingleJob(std::move(exec), id)); - break; - default: - Y_ASSERT(0); - break; - } - Impl_->HasJob.Signal(); -} - + HasJob.Signal(); +} + +NPar::TLocalExecutor::TLocalExecutor() + : Impl_{MakeHolder<TImpl>()} { +} + +NPar::TLocalExecutor::~TLocalExecutor() = default; + +void NPar::TLocalExecutor::RunAdditionalThreads(int threadCount) { + for (int i = 0; i < threadCount; i++) + Impl_->RunNewThread(); +} + +void NPar::TLocalExecutor::Exec(TIntrusivePtr<ILocallyExecutable> exec, int id, int flags) { + Y_ASSERT((flags & WAIT_COMPLETE) == 0); // unsupported + int prior = Max<int>(Impl_->CurrentTaskPriority, flags & PRIORITY_MASK); + switch (prior) { + case HIGH_PRIORITY: + AtomicAdd(Impl_->QueueSize, 1); + Impl_->JobQueue.Enqueue(TSingleJob(std::move(exec), id)); + break; + case MED_PRIORITY: + AtomicAdd(Impl_->MPQueueSize, 1); + Impl_->MedJobQueue.Enqueue(TSingleJob(std::move(exec), id)); + break; + case LOW_PRIORITY: + AtomicAdd(Impl_->LPQueueSize, 1); + Impl_->LowJobQueue.Enqueue(TSingleJob(std::move(exec), id)); + break; + default: + Y_ASSERT(0); + break; + } + Impl_->HasJob.Signal(); +} + void NPar::ILocalExecutor::Exec(TLocallyExecutableFunction exec, int id, int flags) { - Exec(new TFunctionWrapper(std::move(exec)), id, flags); -} - -void NPar::TLocalExecutor::ExecRange(TIntrusivePtr<ILocallyExecutable> exec, int firstId, int lastId, int flags) { - Y_ASSERT(lastId >= firstId); + Exec(new TFunctionWrapper(std::move(exec)), id, flags); +} + +void NPar::TLocalExecutor::ExecRange(TIntrusivePtr<ILocallyExecutable> exec, int firstId, int lastId, int flags) { + Y_ASSERT(lastId >= firstId); if (TryExecRangeSequentially([=] (int id) { exec->LocalExec(id); }, firstId, lastId, flags)) { - return; - } - auto rangeExec = MakeIntrusive<TLocalRangeExecutor>(std::move(exec), firstId, lastId); - int queueSizeLimit = (flags & WAIT_COMPLETE) ? 10000 : -1; - int prior = Max<int>(Impl_->CurrentTaskPriority, flags & PRIORITY_MASK); - switch (prior) { - case HIGH_PRIORITY: - Impl_->LaunchRange(rangeExec, queueSizeLimit, &Impl_->QueueSize, &Impl_->JobQueue); - break; - case MED_PRIORITY: - Impl_->LaunchRange(rangeExec, queueSizeLimit, &Impl_->MPQueueSize, &Impl_->MedJobQueue); - break; - case LOW_PRIORITY: - Impl_->LaunchRange(rangeExec, queueSizeLimit, &Impl_->LPQueueSize, &Impl_->LowJobQueue); - break; - default: - Y_ASSERT(0); - break; - } - if (flags & WAIT_COMPLETE) { - int keepPrior = Impl_->CurrentTaskPriority; - Impl_->CurrentTaskPriority = prior; - while (rangeExec->DoSingleOp()) { - } - Impl_->CurrentTaskPriority = keepPrior; - rangeExec->WaitComplete(); - } -} - + return; + } + auto rangeExec = MakeIntrusive<TLocalRangeExecutor>(std::move(exec), firstId, lastId); + int queueSizeLimit = (flags & WAIT_COMPLETE) ? 10000 : -1; + int prior = Max<int>(Impl_->CurrentTaskPriority, flags & PRIORITY_MASK); + switch (prior) { + case HIGH_PRIORITY: + Impl_->LaunchRange(rangeExec, queueSizeLimit, &Impl_->QueueSize, &Impl_->JobQueue); + break; + case MED_PRIORITY: + Impl_->LaunchRange(rangeExec, queueSizeLimit, &Impl_->MPQueueSize, &Impl_->MedJobQueue); + break; + case LOW_PRIORITY: + Impl_->LaunchRange(rangeExec, queueSizeLimit, &Impl_->LPQueueSize, &Impl_->LowJobQueue); + break; + default: + Y_ASSERT(0); + break; + } + if (flags & WAIT_COMPLETE) { + int keepPrior = Impl_->CurrentTaskPriority; + Impl_->CurrentTaskPriority = prior; + while (rangeExec->DoSingleOp()) { + } + Impl_->CurrentTaskPriority = keepPrior; + rangeExec->WaitComplete(); + } +} + void NPar::ILocalExecutor::ExecRange(TLocallyExecutableFunction exec, int firstId, int lastId, int flags) { if (TryExecRangeSequentially(exec, firstId, lastId, flags)) { return; } - ExecRange(new TFunctionWrapper(exec), firstId, lastId, flags); -} - + ExecRange(new TFunctionWrapper(exec), firstId, lastId, flags); +} + void NPar::ILocalExecutor::ExecRangeWithThrow(TLocallyExecutableFunction exec, int firstId, int lastId, int flags) { - Y_VERIFY((flags & WAIT_COMPLETE) != 0, "ExecRangeWithThrow() requires WAIT_COMPLETE to wait if exceptions arise."); + Y_VERIFY((flags & WAIT_COMPLETE) != 0, "ExecRangeWithThrow() requires WAIT_COMPLETE to wait if exceptions arise."); if (TryExecRangeSequentially(exec, firstId, lastId, flags)) { return; } - TVector<NThreading::TFuture<void>> currentRun = ExecRangeWithFutures(exec, firstId, lastId, flags); - for (auto& result : currentRun) { - result.GetValueSync(); // Exception will be rethrown if exists. If several exception - only the one with minimal id is rethrown. - } -} - -TVector<NThreading::TFuture<void>> + TVector<NThreading::TFuture<void>> currentRun = ExecRangeWithFutures(exec, firstId, lastId, flags); + for (auto& result : currentRun) { + result.GetValueSync(); // Exception will be rethrown if exists. If several exception - only the one with minimal id is rethrown. + } +} + +TVector<NThreading::TFuture<void>> NPar::ILocalExecutor::ExecRangeWithFutures(TLocallyExecutableFunction exec, int firstId, int lastId, int flags) { - TFunctionWrapperWithPromise* execWrapper = new TFunctionWrapperWithPromise(exec, firstId, lastId); - TVector<NThreading::TFuture<void>> out = execWrapper->GetFutures(); - ExecRange(execWrapper, firstId, lastId, flags); - return out; -} - -void NPar::TLocalExecutor::ClearLPQueue() { - for (bool cont = true; cont;) { - cont = false; - TSingleJob job; - while (Impl_->LowJobQueue.Dequeue(&job)) { - AtomicAdd(Impl_->LPQueueSize, -1); - cont = true; - } - while (Impl_->MedJobQueue.Dequeue(&job)) { - AtomicAdd(Impl_->MPQueueSize, -1); - cont = true; - } - } -} - -int NPar::TLocalExecutor::GetQueueSize() const noexcept { - return AtomicGet(Impl_->QueueSize); -} - -int NPar::TLocalExecutor::GetMPQueueSize() const noexcept { - return AtomicGet(Impl_->MPQueueSize); -} - -int NPar::TLocalExecutor::GetLPQueueSize() const noexcept { - return AtomicGet(Impl_->LPQueueSize); -} - + TFunctionWrapperWithPromise* execWrapper = new TFunctionWrapperWithPromise(exec, firstId, lastId); + TVector<NThreading::TFuture<void>> out = execWrapper->GetFutures(); + ExecRange(execWrapper, firstId, lastId, flags); + return out; +} + +void NPar::TLocalExecutor::ClearLPQueue() { + for (bool cont = true; cont;) { + cont = false; + TSingleJob job; + while (Impl_->LowJobQueue.Dequeue(&job)) { + AtomicAdd(Impl_->LPQueueSize, -1); + cont = true; + } + while (Impl_->MedJobQueue.Dequeue(&job)) { + AtomicAdd(Impl_->MPQueueSize, -1); + cont = true; + } + } +} + +int NPar::TLocalExecutor::GetQueueSize() const noexcept { + return AtomicGet(Impl_->QueueSize); +} + +int NPar::TLocalExecutor::GetMPQueueSize() const noexcept { + return AtomicGet(Impl_->MPQueueSize); +} + +int NPar::TLocalExecutor::GetLPQueueSize() const noexcept { + return AtomicGet(Impl_->LPQueueSize); +} + int NPar::TLocalExecutor::GetWorkerThreadId() const noexcept { - return Impl_->WorkerThreadId; -} - -int NPar::TLocalExecutor::GetThreadCount() const noexcept { - return AtomicGet(Impl_->ThreadCount); -} - -////////////////////////////////////////////////////////////////////////// + return Impl_->WorkerThreadId; +} + +int NPar::TLocalExecutor::GetThreadCount() const noexcept { + return AtomicGet(Impl_->ThreadCount); +} + +////////////////////////////////////////////////////////////////////////// diff --git a/library/cpp/threading/local_executor/local_executor.h b/library/cpp/threading/local_executor/local_executor.h index c1c824f67c..aa500d34d3 100644 --- a/library/cpp/threading/local_executor/local_executor.h +++ b/library/cpp/threading/local_executor/local_executor.h @@ -1,23 +1,23 @@ #pragma once #include <library/cpp/threading/future/future.h> - + #include <util/generic/cast.h> -#include <util/generic/fwd.h> -#include <util/generic/noncopyable.h> +#include <util/generic/fwd.h> +#include <util/generic/noncopyable.h> #include <util/generic/ptr.h> -#include <util/generic/singleton.h> +#include <util/generic/singleton.h> #include <util/generic/ymath.h> - + #include <functional> namespace NPar { struct ILocallyExecutable : virtual public TThrRefBase { - // Must be implemented by the end user to define job that will be processed by one of - // executor threads. - // - // @param id Job parameter, typically an index pointing somewhere in array, or just - // some dummy value, e.g. `0`. + // Must be implemented by the end user to define job that will be processed by one of + // executor threads. + // + // @param id Job parameter, typically an index pointing somewhere in array, or just + // some dummy value, e.g. `0`. virtual void LocalExec(int id) = 0; }; @@ -31,7 +31,7 @@ namespace NPar { ILocalExecutor() = default; virtual ~ILocalExecutor() = default; - enum EFlags : int { + enum EFlags : int { HIGH_PRIORITY = 0, MED_PRIORITY = 1, LOW_PRIORITY = 2, @@ -58,8 +58,8 @@ namespace NPar { virtual int GetWorkerThreadId() const noexcept = 0; virtual int GetThreadCount() const noexcept = 0; - // Describes a range of tasks with parameters from integer range [FirstId, LastId). - // + // Describes a range of tasks with parameters from integer range [FirstId, LastId). + // class TExecRangeParams { public: template <typename TFirst, typename TLast> @@ -70,9 +70,9 @@ namespace NPar { Y_ASSERT(LastId >= FirstId); SetBlockSize(1); } - // Partition tasks into `blockCount` blocks of approximately equal size, each of which - // will be executed as a separate bigger task. - // + // Partition tasks into `blockCount` blocks of approximately equal size, each of which + // will be executed as a separate bigger task. + // template <typename TBlockCount> TExecRangeParams& SetBlockCount(TBlockCount blockCount) { Y_ASSERT(SafeIntegerCast<int>(blockCount) > 0 || FirstId == LastId); @@ -81,9 +81,9 @@ namespace NPar { BlockEqualToThreads = false; return *this; } - // Partition tasks into blocks of approximately `blockSize` size, each of which will - // be executed as a separate bigger task. - // + // Partition tasks into blocks of approximately `blockSize` size, each of which will + // be executed as a separate bigger task. + // template <typename TBlockSize> TExecRangeParams& SetBlockSize(TBlockSize blockSize) { Y_ASSERT(SafeIntegerCast<int>(blockSize) > 0 || FirstId == LastId); @@ -92,9 +92,9 @@ namespace NPar { BlockEqualToThreads = false; return *this; } - // Partition tasks into thread count blocks of approximately equal size, each of which - // will be executed as a separate bigger task. - // + // Partition tasks into thread count blocks of approximately equal size, each of which + // will be executed as a separate bigger task. + // TExecRangeParams& SetBlockCountToThreadCount() { BlockEqualToThreads = true; return *this; @@ -107,9 +107,9 @@ namespace NPar { Y_ASSERT(!BlockEqualToThreads); return BlockSize; } - bool GetBlockEqualToThreads() { - return BlockEqualToThreads; - } + bool GetBlockEqualToThreads() { + return BlockEqualToThreads; + } const int FirstId = 0; const int LastId = 0; @@ -120,26 +120,26 @@ namespace NPar { bool BlockEqualToThreads; }; - // `Exec` and `ExecRange` versions that accept functions. - // - void Exec(TLocallyExecutableFunction exec, int id, int flags); - void ExecRange(TLocallyExecutableFunction exec, int firstId, int lastId, int flags); - - // Version of `ExecRange` that throws exception from task with minimal id if at least one of - // task threw an exception. - // - void ExecRangeWithThrow(TLocallyExecutableFunction exec, int firstId, int lastId, int flags); - - // Version of `ExecRange` that returns vector of futures, thus allowing to retry any task if - // it fails. - // - TVector<NThreading::TFuture<void>> ExecRangeWithFutures(TLocallyExecutableFunction exec, int firstId, int lastId, int flags); - + // `Exec` and `ExecRange` versions that accept functions. + // + void Exec(TLocallyExecutableFunction exec, int id, int flags); + void ExecRange(TLocallyExecutableFunction exec, int firstId, int lastId, int flags); + + // Version of `ExecRange` that throws exception from task with minimal id if at least one of + // task threw an exception. + // + void ExecRangeWithThrow(TLocallyExecutableFunction exec, int firstId, int lastId, int flags); + + // Version of `ExecRange` that returns vector of futures, thus allowing to retry any task if + // it fails. + // + TVector<NThreading::TFuture<void>> ExecRangeWithFutures(TLocallyExecutableFunction exec, int firstId, int lastId, int flags); + template <typename TBody> static inline auto BlockedLoopBody(const TExecRangeParams& params, const TBody& body) { return [=](int blockId) { - const int blockFirstId = params.FirstId + blockId * params.GetBlockSize(); - const int blockLastId = Min(params.LastId, blockFirstId + params.GetBlockSize()); + const int blockFirstId = params.FirstId + blockId * params.GetBlockSize(); + const int blockLastId = Min(params.LastId, blockFirstId + params.GetBlockSize()); for (int i = blockFirstId; i < blockLastId; ++i) { body(i); } @@ -151,10 +151,10 @@ namespace NPar { if (TryExecRangeSequentially(body, params.FirstId, params.LastId, flags)) { return; } - if (params.GetBlockEqualToThreads()) { - params.SetBlockCount(GetThreadCount() + ((flags & WAIT_COMPLETE) != 0)); // ThreadCount or ThreadCount+1 depending on WaitFlag + if (params.GetBlockEqualToThreads()) { + params.SetBlockCount(GetThreadCount() + ((flags & WAIT_COMPLETE) != 0)); // ThreadCount or ThreadCount+1 depending on WaitFlag } - ExecRange(BlockedLoopBody(params, body), 0, params.GetBlockCount(), flags); + ExecRange(BlockedLoopBody(params, body), 0, params.GetBlockCount(), flags); } template <typename TBody> @@ -269,7 +269,7 @@ namespace NPar { THolder<TImpl> Impl_; }; - static inline TLocalExecutor& LocalExecutor() { + static inline TLocalExecutor& LocalExecutor() { return *Singleton<TLocalExecutor>(); } diff --git a/library/cpp/threading/local_executor/ut/local_executor_ut.cpp b/library/cpp/threading/local_executor/ut/local_executor_ut.cpp index ac5737717c..fe7dab0899 100644 --- a/library/cpp/threading/local_executor/ut/local_executor_ut.cpp +++ b/library/cpp/threading/local_executor/ut/local_executor_ut.cpp @@ -1,10 +1,10 @@ #include <library/cpp/threading/local_executor/local_executor.h> #include <library/cpp/threading/future/future.h> - + #include <library/cpp/testing/unittest/registar.h> #include <util/system/mutex.h> #include <util/system/rwlock.h> -#include <util/generic/algorithm.h> +#include <util/generic/algorithm.h> using namespace NPar; @@ -14,7 +14,7 @@ class TTestException: public yexception { static const int DefaultThreadsCount = 41; static const int DefaultRangeSize = 999; -Y_UNIT_TEST_SUITE(ExecRangeWithFutures){ +Y_UNIT_TEST_SUITE(ExecRangeWithFutures){ bool AllOf(const TVector<int>& vec, int value){ return AllOf(vec, [value](int element) { return value == element; }); } @@ -41,23 +41,23 @@ void AsyncRunAndWaitFuturesReady(int rangeSize, int threads) { UNIT_ASSERT(AllOf(data, 1)); } -Y_UNIT_TEST(AsyncRunRangeAndWaitFuturesReady) { +Y_UNIT_TEST(AsyncRunRangeAndWaitFuturesReady) { AsyncRunAndWaitFuturesReady(DefaultRangeSize, DefaultThreadsCount); } -Y_UNIT_TEST(AsyncRunOneTaskAndWaitFuturesReady) { +Y_UNIT_TEST(AsyncRunOneTaskAndWaitFuturesReady) { AsyncRunAndWaitFuturesReady(1, DefaultThreadsCount); } -Y_UNIT_TEST(AsyncRunRangeAndWaitFuturesReadyOneExtraThread) { +Y_UNIT_TEST(AsyncRunRangeAndWaitFuturesReadyOneExtraThread) { AsyncRunAndWaitFuturesReady(DefaultRangeSize, 1); } -Y_UNIT_TEST(AsyncRunOneThreadAndWaitFuturesReadyOneExtraThread) { +Y_UNIT_TEST(AsyncRunOneThreadAndWaitFuturesReadyOneExtraThread) { AsyncRunAndWaitFuturesReady(1, 1); } -Y_UNIT_TEST(AsyncRunTwoRangesAndWaitFuturesReady) { +Y_UNIT_TEST(AsyncRunTwoRangesAndWaitFuturesReady) { TLocalExecutor localExecutor; localExecutor.RunAdditionalThreads(DefaultThreadsCount); TAtomic signal = 0; @@ -118,23 +118,23 @@ void AsyncRunRangeAndWaitExceptions(int rangeSize, int threadsCount) { UNIT_ASSERT(AllOf(data, 1)); } -Y_UNIT_TEST(AsyncRunRangeAndWaitExceptions) { +Y_UNIT_TEST(AsyncRunRangeAndWaitExceptions) { AsyncRunRangeAndWaitExceptions(DefaultRangeSize, DefaultThreadsCount); } -Y_UNIT_TEST(AsyncRunOneTaskAndWaitExceptions) { +Y_UNIT_TEST(AsyncRunOneTaskAndWaitExceptions) { AsyncRunRangeAndWaitExceptions(1, DefaultThreadsCount); } -Y_UNIT_TEST(AsyncRunRangeAndWaitExceptionsOneExtraThread) { +Y_UNIT_TEST(AsyncRunRangeAndWaitExceptionsOneExtraThread) { AsyncRunRangeAndWaitExceptions(DefaultRangeSize, 1); } -Y_UNIT_TEST(AsyncRunOneTaskAndWaitExceptionsOneExtraThread) { +Y_UNIT_TEST(AsyncRunOneTaskAndWaitExceptionsOneExtraThread) { AsyncRunRangeAndWaitExceptions(1, 1); } -Y_UNIT_TEST(AsyncRunTwoRangesAndWaitExceptions) { +Y_UNIT_TEST(AsyncRunTwoRangesAndWaitExceptions) { TLocalExecutor localExecutor; localExecutor.RunAdditionalThreads(DefaultThreadsCount); TAtomic signal = 0; @@ -209,33 +209,33 @@ void RunRangeAndCheckExceptionsWithWaitComplete(int rangeSize, int threadsCount) UNIT_ASSERT(AllOf(data, 1)); } -Y_UNIT_TEST(RunRangeAndCheckExceptionsWithWaitComplete) { +Y_UNIT_TEST(RunRangeAndCheckExceptionsWithWaitComplete) { RunRangeAndCheckExceptionsWithWaitComplete(DefaultRangeSize, DefaultThreadsCount); } -Y_UNIT_TEST(RunOneAndCheckExceptionsWithWaitComplete) { +Y_UNIT_TEST(RunOneAndCheckExceptionsWithWaitComplete) { RunRangeAndCheckExceptionsWithWaitComplete(1, DefaultThreadsCount); } -Y_UNIT_TEST(RunRangeAndCheckExceptionsWithWaitCompleteOneExtraThread) { +Y_UNIT_TEST(RunRangeAndCheckExceptionsWithWaitCompleteOneExtraThread) { RunRangeAndCheckExceptionsWithWaitComplete(DefaultRangeSize, 1); } -Y_UNIT_TEST(RunOneAndCheckExceptionsWithWaitCompleteOneExtraThread) { +Y_UNIT_TEST(RunOneAndCheckExceptionsWithWaitCompleteOneExtraThread) { RunRangeAndCheckExceptionsWithWaitComplete(1, 1); } -Y_UNIT_TEST(RunRangeAndCheckExceptionsWithWaitCompleteZeroExtraThreads) { +Y_UNIT_TEST(RunRangeAndCheckExceptionsWithWaitCompleteZeroExtraThreads) { RunRangeAndCheckExceptionsWithWaitComplete(DefaultRangeSize, 0); } -Y_UNIT_TEST(RunOneAndCheckExceptionsWithWaitCompleteZeroExtraThreads) { +Y_UNIT_TEST(RunOneAndCheckExceptionsWithWaitCompleteZeroExtraThreads) { RunRangeAndCheckExceptionsWithWaitComplete(1, 0); } } ; -Y_UNIT_TEST_SUITE(ExecRangeWithThrow){ +Y_UNIT_TEST_SUITE(ExecRangeWithThrow){ void RunParallelWhichThrowsTTestException(int rangeStart, int rangeSize, int threadsCount, int flags, TAtomic& processed){ AtomicSet(processed, 0); TLocalExecutor localExecutor; @@ -247,7 +247,7 @@ localExecutor.ExecRangeWithThrow([&processed](int) { rangeStart, rangeStart + rangeSize, flags); } -Y_UNIT_TEST(RunParallelWhichThrowsTTestException) { +Y_UNIT_TEST(RunParallelWhichThrowsTTestException) { TAtomic processed = 0; UNIT_ASSERT_EXCEPTION( RunParallelWhichThrowsTTestException(10, 40, DefaultThreadsCount, @@ -264,32 +264,32 @@ void ThrowAndCatchTTestException(int rangeSize, int threadsCount, int flags) { UNIT_ASSERT(AtomicGet(processed) == rangeSize); } -Y_UNIT_TEST(ThrowAndCatchTTestExceptionLowPriority) { +Y_UNIT_TEST(ThrowAndCatchTTestExceptionLowPriority) { ThrowAndCatchTTestException(DefaultRangeSize, DefaultThreadsCount, TLocalExecutor::EFlags::WAIT_COMPLETE | TLocalExecutor::EFlags::LOW_PRIORITY); } -Y_UNIT_TEST(ThrowAndCatchTTestExceptionMedPriority) { +Y_UNIT_TEST(ThrowAndCatchTTestExceptionMedPriority) { ThrowAndCatchTTestException(DefaultRangeSize, DefaultThreadsCount, TLocalExecutor::EFlags::WAIT_COMPLETE | TLocalExecutor::EFlags::MED_PRIORITY); } -Y_UNIT_TEST(ThrowAndCatchTTestExceptionHighPriority) { +Y_UNIT_TEST(ThrowAndCatchTTestExceptionHighPriority) { ThrowAndCatchTTestException(DefaultRangeSize, DefaultThreadsCount, TLocalExecutor::EFlags::WAIT_COMPLETE | TLocalExecutor::EFlags::HIGH_PRIORITY); } -Y_UNIT_TEST(ThrowAndCatchTTestExceptionWaitComplete) { +Y_UNIT_TEST(ThrowAndCatchTTestExceptionWaitComplete) { ThrowAndCatchTTestException(DefaultRangeSize, DefaultThreadsCount, TLocalExecutor::EFlags::WAIT_COMPLETE); } -Y_UNIT_TEST(RethrowExeptionSequentialWaitComplete) { +Y_UNIT_TEST(RethrowExeptionSequentialWaitComplete) { ThrowAndCatchTTestException(DefaultRangeSize, 0, TLocalExecutor::EFlags::WAIT_COMPLETE); } -Y_UNIT_TEST(RethrowExeptionOneExtraThreadWaitComplete) { +Y_UNIT_TEST(RethrowExeptionOneExtraThreadWaitComplete) { ThrowAndCatchTTestException(DefaultRangeSize, 1, TLocalExecutor::EFlags::WAIT_COMPLETE); } @@ -314,7 +314,7 @@ void CatchTTestExceptionFromNested(TAtomic& processed1, TAtomic& processed2) { 0, DefaultRangeSize, TLocalExecutor::EFlags::WAIT_COMPLETE); } -Y_UNIT_TEST(NestedParallelExceptionsDoNotLeak) { +Y_UNIT_TEST(NestedParallelExceptionsDoNotLeak) { TAtomic processed1 = 0; TAtomic processed2 = 0; UNIT_ASSERT_NO_EXCEPTION( diff --git a/library/cpp/threading/local_executor/ut/ya.make b/library/cpp/threading/local_executor/ut/ya.make index be579a5ca0..2983c4f466 100644 --- a/library/cpp/threading/local_executor/ut/ya.make +++ b/library/cpp/threading/local_executor/ut/ya.make @@ -1,10 +1,10 @@ OWNER( g:matrixnet - gulin -) + gulin +) UNITTEST_FOR(library/cpp/threading/local_executor) - + SRCS( local_executor_ut.cpp ) diff --git a/library/cpp/threading/local_executor/ya.make b/library/cpp/threading/local_executor/ya.make index df210f92bb..516be66703 100644 --- a/library/cpp/threading/local_executor/ya.make +++ b/library/cpp/threading/local_executor/ya.make @@ -5,8 +5,8 @@ OWNER( espetrov ) -LIBRARY() - +LIBRARY() + SRCS( local_executor.cpp tbb_local_executor.cpp |