diff options
author | yazevnul <yazevnul@yandex-team.ru> | 2022-02-10 16:46:46 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:46:46 +0300 |
commit | 8cbc307de0221f84c80c42dcbe07d40727537e2c (patch) | |
tree | 625d5a673015d1df891e051033e9fcde5c7be4e5 /library/cpp/threading/local_executor/local_executor.cpp | |
parent | 30d1ef3941e0dc835be7609de5ebee66958f215a (diff) | |
download | ydb-8cbc307de0221f84c80c42dcbe07d40727537e2c.tar.gz |
Restoring authorship annotation for <yazevnul@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/threading/local_executor/local_executor.cpp')
-rw-r--r-- | library/cpp/threading/local_executor/local_executor.cpp | 568 |
1 files changed, 284 insertions, 284 deletions
diff --git a/library/cpp/threading/local_executor/local_executor.cpp b/library/cpp/threading/local_executor/local_executor.cpp index 1d3fbb4bf4..6e62d09d85 100644 --- a/library/cpp/threading/local_executor/local_executor.cpp +++ b/library/cpp/threading/local_executor/local_executor.cpp @@ -1,17 +1,17 @@ #include "local_executor.h" #include <library/cpp/threading/future/future.h> - -#include <util/generic/utility.h> -#include <util/system/atomic.h> -#include <util/system/event.h> + +#include <util/generic/utility.h> +#include <util/system/atomic.h> +#include <util/system/event.h> #include <util/system/thread.h> -#include <util/system/tls.h> +#include <util/system/tls.h> #include <util/system/yield.h> -#include <util/thread/lfqueue.h> - -#include <utility> +#include <util/thread/lfqueue.h> +#include <utility> + #ifdef _win_ static void RegularYield() { } @@ -23,11 +23,11 @@ static void RegularYield() { } #endif -namespace { - struct TFunctionWrapper : NPar::ILocallyExecutable { - NPar::TLocallyExecutableFunction Exec; - TFunctionWrapper(NPar::TLocallyExecutableFunction exec) - : Exec(std::move(exec)) +namespace { + struct TFunctionWrapper : NPar::ILocallyExecutable { + NPar::TLocallyExecutableFunction Exec; + TFunctionWrapper(NPar::TLocallyExecutableFunction exec) + : Exec(std::move(exec)) { } void LocalExec(int id) override { @@ -35,15 +35,15 @@ namespace { } }; - class TFunctionWrapperWithPromise: public NPar::ILocallyExecutable { + class TFunctionWrapperWithPromise: public NPar::ILocallyExecutable { private: - NPar::TLocallyExecutableFunction Exec; + NPar::TLocallyExecutableFunction Exec; int FirstId, LastId; TVector<NThreading::TPromise<void>> Promises; public: - TFunctionWrapperWithPromise(NPar::TLocallyExecutableFunction exec, int firstId, int lastId) - : Exec(std::move(exec)) + TFunctionWrapperWithPromise(NPar::TLocallyExecutableFunction exec, int firstId, int lastId) + : Exec(std::move(exec)) , FirstId(firstId) , LastId(lastId) { @@ -70,300 +70,300 @@ namespace { } }; - struct TSingleJob { - TIntrusivePtr<NPar::ILocallyExecutable> Exec; - int Id{0}; + struct TSingleJob { + TIntrusivePtr<NPar::ILocallyExecutable> Exec; + int Id{0}; - TSingleJob() = default; - TSingleJob(TIntrusivePtr<NPar::ILocallyExecutable> exec, int id) - : Exec(std::move(exec)) - , Id(id) - { + TSingleJob() = default; + TSingleJob(TIntrusivePtr<NPar::ILocallyExecutable> exec, int id) + : Exec(std::move(exec)) + , Id(id) + { } - }; + }; - class TLocalRangeExecutor: public NPar::ILocallyExecutable { - TIntrusivePtr<NPar::ILocallyExecutable> Exec; + class TLocalRangeExecutor: public NPar::ILocallyExecutable { + TIntrusivePtr<NPar::ILocallyExecutable> Exec; alignas(64) TAtomic Counter; alignas(64) TAtomic WorkerCount; - int LastId; - - void LocalExec(int) override { - AtomicAdd(WorkerCount, 1); - for (;;) { - if (!DoSingleOp()) - break; - } - AtomicAdd(WorkerCount, -1); + int LastId; + + void LocalExec(int) override { + AtomicAdd(WorkerCount, 1); + for (;;) { + if (!DoSingleOp()) + break; + } + AtomicAdd(WorkerCount, -1); } - public: - TLocalRangeExecutor(TIntrusivePtr<ILocallyExecutable> exec, int firstId, int lastId) - : Exec(std::move(exec)) - , Counter(firstId) - , WorkerCount(0) - , LastId(lastId) - { + public: + TLocalRangeExecutor(TIntrusivePtr<ILocallyExecutable> exec, int firstId, int lastId) + : Exec(std::move(exec)) + , Counter(firstId) + , WorkerCount(0) + , LastId(lastId) + { } - bool DoSingleOp() { + bool DoSingleOp() { const int id = AtomicAdd(Counter, 1) - 1; - if (id >= LastId) - return false; - Exec->LocalExec(id); - RegularYield(); - return true; + if (id >= LastId) + return false; + Exec->LocalExec(id); + RegularYield(); + return true; } - void WaitComplete() { - while (AtomicGet(WorkerCount) > 0) - RegularYield(); + void WaitComplete() { + while (AtomicGet(WorkerCount) > 0) + RegularYield(); } - int GetRangeSize() const { - return Max<int>(LastId - Counter, 0); + int GetRangeSize() const { + return Max<int>(LastId - Counter, 0); } - }; + }; } - -////////////////////////////////////////////////////////////////////////// -class NPar::TLocalExecutor::TImpl { -public: - TLockFreeQueue<TSingleJob> JobQueue; - TLockFreeQueue<TSingleJob> MedJobQueue; - TLockFreeQueue<TSingleJob> LowJobQueue; + +////////////////////////////////////////////////////////////////////////// +class NPar::TLocalExecutor::TImpl { +public: + TLockFreeQueue<TSingleJob> JobQueue; + TLockFreeQueue<TSingleJob> MedJobQueue; + TLockFreeQueue<TSingleJob> LowJobQueue; alignas(64) TSystemEvent HasJob; - - TAtomic ThreadCount{0}; + + TAtomic ThreadCount{0}; alignas(64) TAtomic QueueSize{0}; - TAtomic MPQueueSize{0}; - TAtomic LPQueueSize{0}; - TAtomic ThreadId{0}; - - Y_THREAD(int) - CurrentTaskPriority; - Y_THREAD(int) - WorkerThreadId; - - static void* HostWorkerThread(void* p); - bool GetJob(TSingleJob* job); - void RunNewThread(); - void LaunchRange(TIntrusivePtr<TLocalRangeExecutor> execRange, int queueSizeLimit, - TAtomic* queueSize, TLockFreeQueue<TSingleJob>* jobQueue); - - TImpl() = default; - ~TImpl(); -}; - -NPar::TLocalExecutor::TImpl::~TImpl() { - AtomicAdd(QueueSize, 1); - JobQueue.Enqueue(TSingleJob(nullptr, 0)); - HasJob.Signal(); - while (AtomicGet(ThreadCount)) { - ThreadYield(); - } -} - -void* NPar::TLocalExecutor::TImpl::HostWorkerThread(void* p) { - static const int FAST_ITERATIONS = 200; - - auto* const ctx = (TImpl*)p; + TAtomic MPQueueSize{0}; + TAtomic LPQueueSize{0}; + TAtomic ThreadId{0}; + + Y_THREAD(int) + CurrentTaskPriority; + Y_THREAD(int) + WorkerThreadId; + + static void* HostWorkerThread(void* p); + bool GetJob(TSingleJob* job); + void RunNewThread(); + void LaunchRange(TIntrusivePtr<TLocalRangeExecutor> execRange, int queueSizeLimit, + TAtomic* queueSize, TLockFreeQueue<TSingleJob>* jobQueue); + + TImpl() = default; + ~TImpl(); +}; + +NPar::TLocalExecutor::TImpl::~TImpl() { + AtomicAdd(QueueSize, 1); + JobQueue.Enqueue(TSingleJob(nullptr, 0)); + HasJob.Signal(); + while (AtomicGet(ThreadCount)) { + ThreadYield(); + } +} + +void* NPar::TLocalExecutor::TImpl::HostWorkerThread(void* p) { + static const int FAST_ITERATIONS = 200; + + auto* const ctx = (TImpl*)p; TThread::SetCurrentThreadName("ParLocalExecutor"); - ctx->WorkerThreadId = AtomicAdd(ctx->ThreadId, 1); - for (bool cont = true; cont;) { - TSingleJob job; - bool gotJob = false; - for (int iter = 0; iter < FAST_ITERATIONS; ++iter) { - if (ctx->GetJob(&job)) { - gotJob = true; - break; - } - } - if (!gotJob) { - ctx->HasJob.Reset(); - if (!ctx->GetJob(&job)) { - ctx->HasJob.Wait(); - continue; - } - } - if (job.Exec.Get()) { - job.Exec->LocalExec(job.Id); - RegularYield(); - } else { - AtomicAdd(ctx->QueueSize, 1); - ctx->JobQueue.Enqueue(job); - ctx->HasJob.Signal(); - cont = false; - } - } - AtomicAdd(ctx->ThreadCount, -1); - return nullptr; -} - -bool NPar::TLocalExecutor::TImpl::GetJob(TSingleJob* job) { - if (JobQueue.Dequeue(job)) { - CurrentTaskPriority = TLocalExecutor::HIGH_PRIORITY; - AtomicAdd(QueueSize, -1); - return true; - } else if (MedJobQueue.Dequeue(job)) { - CurrentTaskPriority = TLocalExecutor::MED_PRIORITY; - AtomicAdd(MPQueueSize, -1); - return true; - } else if (LowJobQueue.Dequeue(job)) { - CurrentTaskPriority = TLocalExecutor::LOW_PRIORITY; - AtomicAdd(LPQueueSize, -1); - return true; - } - return false; -} - -void NPar::TLocalExecutor::TImpl::RunNewThread() { - AtomicAdd(ThreadCount, 1); - TThread thr(HostWorkerThread, this); - thr.Start(); - thr.Detach(); -} - -void NPar::TLocalExecutor::TImpl::LaunchRange(TIntrusivePtr<TLocalRangeExecutor> rangeExec, - int queueSizeLimit, - TAtomic* queueSize, - TLockFreeQueue<TSingleJob>* jobQueue) { - int count = Min<int>(ThreadCount + 1, rangeExec->GetRangeSize()); - if (queueSizeLimit >= 0 && AtomicGet(*queueSize) >= queueSizeLimit) { - return; - } - AtomicAdd(*queueSize, count); + ctx->WorkerThreadId = AtomicAdd(ctx->ThreadId, 1); + for (bool cont = true; cont;) { + TSingleJob job; + bool gotJob = false; + for (int iter = 0; iter < FAST_ITERATIONS; ++iter) { + if (ctx->GetJob(&job)) { + gotJob = true; + break; + } + } + if (!gotJob) { + ctx->HasJob.Reset(); + if (!ctx->GetJob(&job)) { + ctx->HasJob.Wait(); + continue; + } + } + if (job.Exec.Get()) { + job.Exec->LocalExec(job.Id); + RegularYield(); + } else { + AtomicAdd(ctx->QueueSize, 1); + ctx->JobQueue.Enqueue(job); + ctx->HasJob.Signal(); + cont = false; + } + } + AtomicAdd(ctx->ThreadCount, -1); + return nullptr; +} + +bool NPar::TLocalExecutor::TImpl::GetJob(TSingleJob* job) { + if (JobQueue.Dequeue(job)) { + CurrentTaskPriority = TLocalExecutor::HIGH_PRIORITY; + AtomicAdd(QueueSize, -1); + return true; + } else if (MedJobQueue.Dequeue(job)) { + CurrentTaskPriority = TLocalExecutor::MED_PRIORITY; + AtomicAdd(MPQueueSize, -1); + return true; + } else if (LowJobQueue.Dequeue(job)) { + CurrentTaskPriority = TLocalExecutor::LOW_PRIORITY; + AtomicAdd(LPQueueSize, -1); + return true; + } + return false; +} + +void NPar::TLocalExecutor::TImpl::RunNewThread() { + AtomicAdd(ThreadCount, 1); + TThread thr(HostWorkerThread, this); + thr.Start(); + thr.Detach(); +} + +void NPar::TLocalExecutor::TImpl::LaunchRange(TIntrusivePtr<TLocalRangeExecutor> rangeExec, + int queueSizeLimit, + TAtomic* queueSize, + TLockFreeQueue<TSingleJob>* jobQueue) { + int count = Min<int>(ThreadCount + 1, rangeExec->GetRangeSize()); + if (queueSizeLimit >= 0 && AtomicGet(*queueSize) >= queueSizeLimit) { + return; + } + AtomicAdd(*queueSize, count); jobQueue->EnqueueAll(TVector<TSingleJob>{size_t(count), TSingleJob(rangeExec, 0)}); - HasJob.Signal(); -} - -NPar::TLocalExecutor::TLocalExecutor() - : Impl_{MakeHolder<TImpl>()} { -} - -NPar::TLocalExecutor::~TLocalExecutor() = default; - -void NPar::TLocalExecutor::RunAdditionalThreads(int threadCount) { - for (int i = 0; i < threadCount; i++) - Impl_->RunNewThread(); -} - -void NPar::TLocalExecutor::Exec(TIntrusivePtr<ILocallyExecutable> exec, int id, int flags) { - Y_ASSERT((flags & WAIT_COMPLETE) == 0); // unsupported - int prior = Max<int>(Impl_->CurrentTaskPriority, flags & PRIORITY_MASK); - switch (prior) { - case HIGH_PRIORITY: - AtomicAdd(Impl_->QueueSize, 1); - Impl_->JobQueue.Enqueue(TSingleJob(std::move(exec), id)); - break; - case MED_PRIORITY: - AtomicAdd(Impl_->MPQueueSize, 1); - Impl_->MedJobQueue.Enqueue(TSingleJob(std::move(exec), id)); - break; - case LOW_PRIORITY: - AtomicAdd(Impl_->LPQueueSize, 1); - Impl_->LowJobQueue.Enqueue(TSingleJob(std::move(exec), id)); - break; - default: - Y_ASSERT(0); - break; - } - Impl_->HasJob.Signal(); -} - + HasJob.Signal(); +} + +NPar::TLocalExecutor::TLocalExecutor() + : Impl_{MakeHolder<TImpl>()} { +} + +NPar::TLocalExecutor::~TLocalExecutor() = default; + +void NPar::TLocalExecutor::RunAdditionalThreads(int threadCount) { + for (int i = 0; i < threadCount; i++) + Impl_->RunNewThread(); +} + +void NPar::TLocalExecutor::Exec(TIntrusivePtr<ILocallyExecutable> exec, int id, int flags) { + Y_ASSERT((flags & WAIT_COMPLETE) == 0); // unsupported + int prior = Max<int>(Impl_->CurrentTaskPriority, flags & PRIORITY_MASK); + switch (prior) { + case HIGH_PRIORITY: + AtomicAdd(Impl_->QueueSize, 1); + Impl_->JobQueue.Enqueue(TSingleJob(std::move(exec), id)); + break; + case MED_PRIORITY: + AtomicAdd(Impl_->MPQueueSize, 1); + Impl_->MedJobQueue.Enqueue(TSingleJob(std::move(exec), id)); + break; + case LOW_PRIORITY: + AtomicAdd(Impl_->LPQueueSize, 1); + Impl_->LowJobQueue.Enqueue(TSingleJob(std::move(exec), id)); + break; + default: + Y_ASSERT(0); + break; + } + Impl_->HasJob.Signal(); +} + void NPar::ILocalExecutor::Exec(TLocallyExecutableFunction exec, int id, int flags) { - Exec(new TFunctionWrapper(std::move(exec)), id, flags); -} - -void NPar::TLocalExecutor::ExecRange(TIntrusivePtr<ILocallyExecutable> exec, int firstId, int lastId, int flags) { - Y_ASSERT(lastId >= firstId); + Exec(new TFunctionWrapper(std::move(exec)), id, flags); +} + +void NPar::TLocalExecutor::ExecRange(TIntrusivePtr<ILocallyExecutable> exec, int firstId, int lastId, int flags) { + Y_ASSERT(lastId >= firstId); if (TryExecRangeSequentially([=] (int id) { exec->LocalExec(id); }, firstId, lastId, flags)) { - return; - } - auto rangeExec = MakeIntrusive<TLocalRangeExecutor>(std::move(exec), firstId, lastId); - int queueSizeLimit = (flags & WAIT_COMPLETE) ? 10000 : -1; - int prior = Max<int>(Impl_->CurrentTaskPriority, flags & PRIORITY_MASK); - switch (prior) { - case HIGH_PRIORITY: - Impl_->LaunchRange(rangeExec, queueSizeLimit, &Impl_->QueueSize, &Impl_->JobQueue); - break; - case MED_PRIORITY: - Impl_->LaunchRange(rangeExec, queueSizeLimit, &Impl_->MPQueueSize, &Impl_->MedJobQueue); - break; - case LOW_PRIORITY: - Impl_->LaunchRange(rangeExec, queueSizeLimit, &Impl_->LPQueueSize, &Impl_->LowJobQueue); - break; - default: - Y_ASSERT(0); - break; - } - if (flags & WAIT_COMPLETE) { - int keepPrior = Impl_->CurrentTaskPriority; - Impl_->CurrentTaskPriority = prior; - while (rangeExec->DoSingleOp()) { - } - Impl_->CurrentTaskPriority = keepPrior; - rangeExec->WaitComplete(); - } -} - + return; + } + auto rangeExec = MakeIntrusive<TLocalRangeExecutor>(std::move(exec), firstId, lastId); + int queueSizeLimit = (flags & WAIT_COMPLETE) ? 10000 : -1; + int prior = Max<int>(Impl_->CurrentTaskPriority, flags & PRIORITY_MASK); + switch (prior) { + case HIGH_PRIORITY: + Impl_->LaunchRange(rangeExec, queueSizeLimit, &Impl_->QueueSize, &Impl_->JobQueue); + break; + case MED_PRIORITY: + Impl_->LaunchRange(rangeExec, queueSizeLimit, &Impl_->MPQueueSize, &Impl_->MedJobQueue); + break; + case LOW_PRIORITY: + Impl_->LaunchRange(rangeExec, queueSizeLimit, &Impl_->LPQueueSize, &Impl_->LowJobQueue); + break; + default: + Y_ASSERT(0); + break; + } + if (flags & WAIT_COMPLETE) { + int keepPrior = Impl_->CurrentTaskPriority; + Impl_->CurrentTaskPriority = prior; + while (rangeExec->DoSingleOp()) { + } + Impl_->CurrentTaskPriority = keepPrior; + rangeExec->WaitComplete(); + } +} + void NPar::ILocalExecutor::ExecRange(TLocallyExecutableFunction exec, int firstId, int lastId, int flags) { if (TryExecRangeSequentially(exec, firstId, lastId, flags)) { return; } - ExecRange(new TFunctionWrapper(exec), firstId, lastId, flags); -} - + ExecRange(new TFunctionWrapper(exec), firstId, lastId, flags); +} + void NPar::ILocalExecutor::ExecRangeWithThrow(TLocallyExecutableFunction exec, int firstId, int lastId, int flags) { - Y_VERIFY((flags & WAIT_COMPLETE) != 0, "ExecRangeWithThrow() requires WAIT_COMPLETE to wait if exceptions arise."); + Y_VERIFY((flags & WAIT_COMPLETE) != 0, "ExecRangeWithThrow() requires WAIT_COMPLETE to wait if exceptions arise."); if (TryExecRangeSequentially(exec, firstId, lastId, flags)) { return; } - TVector<NThreading::TFuture<void>> currentRun = ExecRangeWithFutures(exec, firstId, lastId, flags); - for (auto& result : currentRun) { - result.GetValueSync(); // Exception will be rethrown if exists. If several exception - only the one with minimal id is rethrown. - } -} - -TVector<NThreading::TFuture<void>> + TVector<NThreading::TFuture<void>> currentRun = ExecRangeWithFutures(exec, firstId, lastId, flags); + for (auto& result : currentRun) { + result.GetValueSync(); // Exception will be rethrown if exists. If several exception - only the one with minimal id is rethrown. + } +} + +TVector<NThreading::TFuture<void>> NPar::ILocalExecutor::ExecRangeWithFutures(TLocallyExecutableFunction exec, int firstId, int lastId, int flags) { - TFunctionWrapperWithPromise* execWrapper = new TFunctionWrapperWithPromise(exec, firstId, lastId); - TVector<NThreading::TFuture<void>> out = execWrapper->GetFutures(); - ExecRange(execWrapper, firstId, lastId, flags); - return out; -} - -void NPar::TLocalExecutor::ClearLPQueue() { - for (bool cont = true; cont;) { - cont = false; - TSingleJob job; - while (Impl_->LowJobQueue.Dequeue(&job)) { - AtomicAdd(Impl_->LPQueueSize, -1); - cont = true; - } - while (Impl_->MedJobQueue.Dequeue(&job)) { - AtomicAdd(Impl_->MPQueueSize, -1); - cont = true; - } - } -} - -int NPar::TLocalExecutor::GetQueueSize() const noexcept { - return AtomicGet(Impl_->QueueSize); -} - -int NPar::TLocalExecutor::GetMPQueueSize() const noexcept { - return AtomicGet(Impl_->MPQueueSize); -} - -int NPar::TLocalExecutor::GetLPQueueSize() const noexcept { - return AtomicGet(Impl_->LPQueueSize); -} - + TFunctionWrapperWithPromise* execWrapper = new TFunctionWrapperWithPromise(exec, firstId, lastId); + TVector<NThreading::TFuture<void>> out = execWrapper->GetFutures(); + ExecRange(execWrapper, firstId, lastId, flags); + return out; +} + +void NPar::TLocalExecutor::ClearLPQueue() { + for (bool cont = true; cont;) { + cont = false; + TSingleJob job; + while (Impl_->LowJobQueue.Dequeue(&job)) { + AtomicAdd(Impl_->LPQueueSize, -1); + cont = true; + } + while (Impl_->MedJobQueue.Dequeue(&job)) { + AtomicAdd(Impl_->MPQueueSize, -1); + cont = true; + } + } +} + +int NPar::TLocalExecutor::GetQueueSize() const noexcept { + return AtomicGet(Impl_->QueueSize); +} + +int NPar::TLocalExecutor::GetMPQueueSize() const noexcept { + return AtomicGet(Impl_->MPQueueSize); +} + +int NPar::TLocalExecutor::GetLPQueueSize() const noexcept { + return AtomicGet(Impl_->LPQueueSize); +} + int NPar::TLocalExecutor::GetWorkerThreadId() const noexcept { - return Impl_->WorkerThreadId; -} - -int NPar::TLocalExecutor::GetThreadCount() const noexcept { - return AtomicGet(Impl_->ThreadCount); -} - -////////////////////////////////////////////////////////////////////////// + return Impl_->WorkerThreadId; +} + +int NPar::TLocalExecutor::GetThreadCount() const noexcept { + return AtomicGet(Impl_->ThreadCount); +} + +////////////////////////////////////////////////////////////////////////// |