diff options
author | Devtools Arcadia <arcadia-devtools@yandex-team.ru> | 2022-02-07 18:08:42 +0300 |
---|---|---|
committer | Devtools Arcadia <arcadia-devtools@mous.vla.yp-c.yandex.net> | 2022-02-07 18:08:42 +0300 |
commit | 1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch) | |
tree | e26c9fed0de5d9873cce7e00bc214573dc2195b7 /library/cpp/threading/local_executor/local_executor.cpp | |
download | ydb-1110808a9d39d4b808aef724c861a2e1a38d2a69.tar.gz |
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'library/cpp/threading/local_executor/local_executor.cpp')
-rw-r--r-- | library/cpp/threading/local_executor/local_executor.cpp | 369 |
1 files changed, 369 insertions, 0 deletions
diff --git a/library/cpp/threading/local_executor/local_executor.cpp b/library/cpp/threading/local_executor/local_executor.cpp new file mode 100644 index 0000000000..1d3fbb4bf4 --- /dev/null +++ b/library/cpp/threading/local_executor/local_executor.cpp @@ -0,0 +1,369 @@ +#include "local_executor.h" + +#include <library/cpp/threading/future/future.h> + +#include <util/generic/utility.h> +#include <util/system/atomic.h> +#include <util/system/event.h> +#include <util/system/thread.h> +#include <util/system/tls.h> +#include <util/system/yield.h> +#include <util/thread/lfqueue.h> + +#include <utility> + +#ifdef _win_ +static void RegularYield() { +} +#else +// unix actually has cooperative multitasking! :) +// without this function program runs slower and system lags for some magic reason +static void RegularYield() { + SchedYield(); +} +#endif + +namespace { + struct TFunctionWrapper : NPar::ILocallyExecutable { + NPar::TLocallyExecutableFunction Exec; + TFunctionWrapper(NPar::TLocallyExecutableFunction exec) + : Exec(std::move(exec)) + { + } + void LocalExec(int id) override { + Exec(id); + } + }; + + class TFunctionWrapperWithPromise: public NPar::ILocallyExecutable { + private: + NPar::TLocallyExecutableFunction Exec; + int FirstId, LastId; + TVector<NThreading::TPromise<void>> Promises; + + public: + TFunctionWrapperWithPromise(NPar::TLocallyExecutableFunction exec, int firstId, int lastId) + : Exec(std::move(exec)) + , FirstId(firstId) + , LastId(lastId) + { + Y_ASSERT(FirstId <= LastId); + const int rangeSize = LastId - FirstId; + Promises.resize(rangeSize, NThreading::NewPromise()); + for (auto& promise : Promises) { + promise = NThreading::NewPromise(); + } + } + + void LocalExec(int id) override { + Y_ASSERT(FirstId <= id && id < LastId); + NThreading::NImpl::SetValue(Promises[id - FirstId], [=] { Exec(id); }); + } + + TVector<NThreading::TFuture<void>> GetFutures() const { + TVector<NThreading::TFuture<void>> out; + out.reserve(Promises.ysize()); + for (auto& promise : Promises) { + out.push_back(promise.GetFuture()); + } + return out; + } + }; + + struct TSingleJob { + TIntrusivePtr<NPar::ILocallyExecutable> Exec; + int Id{0}; + + TSingleJob() = default; + TSingleJob(TIntrusivePtr<NPar::ILocallyExecutable> exec, int id) + : Exec(std::move(exec)) + , Id(id) + { + } + }; + + class TLocalRangeExecutor: public NPar::ILocallyExecutable { + TIntrusivePtr<NPar::ILocallyExecutable> Exec; + alignas(64) TAtomic Counter; + alignas(64) TAtomic WorkerCount; + int LastId; + + void LocalExec(int) override { + AtomicAdd(WorkerCount, 1); + for (;;) { + if (!DoSingleOp()) + break; + } + AtomicAdd(WorkerCount, -1); + } + + public: + TLocalRangeExecutor(TIntrusivePtr<ILocallyExecutable> exec, int firstId, int lastId) + : Exec(std::move(exec)) + , Counter(firstId) + , WorkerCount(0) + , LastId(lastId) + { + } + bool DoSingleOp() { + const int id = AtomicAdd(Counter, 1) - 1; + if (id >= LastId) + return false; + Exec->LocalExec(id); + RegularYield(); + return true; + } + void WaitComplete() { + while (AtomicGet(WorkerCount) > 0) + RegularYield(); + } + int GetRangeSize() const { + return Max<int>(LastId - Counter, 0); + } + }; + +} + +////////////////////////////////////////////////////////////////////////// +class NPar::TLocalExecutor::TImpl { +public: + TLockFreeQueue<TSingleJob> JobQueue; + TLockFreeQueue<TSingleJob> MedJobQueue; + TLockFreeQueue<TSingleJob> LowJobQueue; + alignas(64) TSystemEvent HasJob; + + TAtomic ThreadCount{0}; + alignas(64) TAtomic QueueSize{0}; + TAtomic MPQueueSize{0}; + TAtomic LPQueueSize{0}; + TAtomic ThreadId{0}; + + Y_THREAD(int) + CurrentTaskPriority; + Y_THREAD(int) + WorkerThreadId; + + static void* HostWorkerThread(void* p); + bool GetJob(TSingleJob* job); + void RunNewThread(); + void LaunchRange(TIntrusivePtr<TLocalRangeExecutor> execRange, int queueSizeLimit, + TAtomic* queueSize, TLockFreeQueue<TSingleJob>* jobQueue); + + TImpl() = default; + ~TImpl(); +}; + +NPar::TLocalExecutor::TImpl::~TImpl() { + AtomicAdd(QueueSize, 1); + JobQueue.Enqueue(TSingleJob(nullptr, 0)); + HasJob.Signal(); + while (AtomicGet(ThreadCount)) { + ThreadYield(); + } +} + +void* NPar::TLocalExecutor::TImpl::HostWorkerThread(void* p) { + static const int FAST_ITERATIONS = 200; + + auto* const ctx = (TImpl*)p; + TThread::SetCurrentThreadName("ParLocalExecutor"); + ctx->WorkerThreadId = AtomicAdd(ctx->ThreadId, 1); + for (bool cont = true; cont;) { + TSingleJob job; + bool gotJob = false; + for (int iter = 0; iter < FAST_ITERATIONS; ++iter) { + if (ctx->GetJob(&job)) { + gotJob = true; + break; + } + } + if (!gotJob) { + ctx->HasJob.Reset(); + if (!ctx->GetJob(&job)) { + ctx->HasJob.Wait(); + continue; + } + } + if (job.Exec.Get()) { + job.Exec->LocalExec(job.Id); + RegularYield(); + } else { + AtomicAdd(ctx->QueueSize, 1); + ctx->JobQueue.Enqueue(job); + ctx->HasJob.Signal(); + cont = false; + } + } + AtomicAdd(ctx->ThreadCount, -1); + return nullptr; +} + +bool NPar::TLocalExecutor::TImpl::GetJob(TSingleJob* job) { + if (JobQueue.Dequeue(job)) { + CurrentTaskPriority = TLocalExecutor::HIGH_PRIORITY; + AtomicAdd(QueueSize, -1); + return true; + } else if (MedJobQueue.Dequeue(job)) { + CurrentTaskPriority = TLocalExecutor::MED_PRIORITY; + AtomicAdd(MPQueueSize, -1); + return true; + } else if (LowJobQueue.Dequeue(job)) { + CurrentTaskPriority = TLocalExecutor::LOW_PRIORITY; + AtomicAdd(LPQueueSize, -1); + return true; + } + return false; +} + +void NPar::TLocalExecutor::TImpl::RunNewThread() { + AtomicAdd(ThreadCount, 1); + TThread thr(HostWorkerThread, this); + thr.Start(); + thr.Detach(); +} + +void NPar::TLocalExecutor::TImpl::LaunchRange(TIntrusivePtr<TLocalRangeExecutor> rangeExec, + int queueSizeLimit, + TAtomic* queueSize, + TLockFreeQueue<TSingleJob>* jobQueue) { + int count = Min<int>(ThreadCount + 1, rangeExec->GetRangeSize()); + if (queueSizeLimit >= 0 && AtomicGet(*queueSize) >= queueSizeLimit) { + return; + } + AtomicAdd(*queueSize, count); + jobQueue->EnqueueAll(TVector<TSingleJob>{size_t(count), TSingleJob(rangeExec, 0)}); + HasJob.Signal(); +} + +NPar::TLocalExecutor::TLocalExecutor() + : Impl_{MakeHolder<TImpl>()} { +} + +NPar::TLocalExecutor::~TLocalExecutor() = default; + +void NPar::TLocalExecutor::RunAdditionalThreads(int threadCount) { + for (int i = 0; i < threadCount; i++) + Impl_->RunNewThread(); +} + +void NPar::TLocalExecutor::Exec(TIntrusivePtr<ILocallyExecutable> exec, int id, int flags) { + Y_ASSERT((flags & WAIT_COMPLETE) == 0); // unsupported + int prior = Max<int>(Impl_->CurrentTaskPriority, flags & PRIORITY_MASK); + switch (prior) { + case HIGH_PRIORITY: + AtomicAdd(Impl_->QueueSize, 1); + Impl_->JobQueue.Enqueue(TSingleJob(std::move(exec), id)); + break; + case MED_PRIORITY: + AtomicAdd(Impl_->MPQueueSize, 1); + Impl_->MedJobQueue.Enqueue(TSingleJob(std::move(exec), id)); + break; + case LOW_PRIORITY: + AtomicAdd(Impl_->LPQueueSize, 1); + Impl_->LowJobQueue.Enqueue(TSingleJob(std::move(exec), id)); + break; + default: + Y_ASSERT(0); + break; + } + Impl_->HasJob.Signal(); +} + +void NPar::ILocalExecutor::Exec(TLocallyExecutableFunction exec, int id, int flags) { + Exec(new TFunctionWrapper(std::move(exec)), id, flags); +} + +void NPar::TLocalExecutor::ExecRange(TIntrusivePtr<ILocallyExecutable> exec, int firstId, int lastId, int flags) { + Y_ASSERT(lastId >= firstId); + if (TryExecRangeSequentially([=] (int id) { exec->LocalExec(id); }, firstId, lastId, flags)) { + return; + } + auto rangeExec = MakeIntrusive<TLocalRangeExecutor>(std::move(exec), firstId, lastId); + int queueSizeLimit = (flags & WAIT_COMPLETE) ? 10000 : -1; + int prior = Max<int>(Impl_->CurrentTaskPriority, flags & PRIORITY_MASK); + switch (prior) { + case HIGH_PRIORITY: + Impl_->LaunchRange(rangeExec, queueSizeLimit, &Impl_->QueueSize, &Impl_->JobQueue); + break; + case MED_PRIORITY: + Impl_->LaunchRange(rangeExec, queueSizeLimit, &Impl_->MPQueueSize, &Impl_->MedJobQueue); + break; + case LOW_PRIORITY: + Impl_->LaunchRange(rangeExec, queueSizeLimit, &Impl_->LPQueueSize, &Impl_->LowJobQueue); + break; + default: + Y_ASSERT(0); + break; + } + if (flags & WAIT_COMPLETE) { + int keepPrior = Impl_->CurrentTaskPriority; + Impl_->CurrentTaskPriority = prior; + while (rangeExec->DoSingleOp()) { + } + Impl_->CurrentTaskPriority = keepPrior; + rangeExec->WaitComplete(); + } +} + +void NPar::ILocalExecutor::ExecRange(TLocallyExecutableFunction exec, int firstId, int lastId, int flags) { + if (TryExecRangeSequentially(exec, firstId, lastId, flags)) { + return; + } + ExecRange(new TFunctionWrapper(exec), firstId, lastId, flags); +} + +void NPar::ILocalExecutor::ExecRangeWithThrow(TLocallyExecutableFunction exec, int firstId, int lastId, int flags) { + Y_VERIFY((flags & WAIT_COMPLETE) != 0, "ExecRangeWithThrow() requires WAIT_COMPLETE to wait if exceptions arise."); + if (TryExecRangeSequentially(exec, firstId, lastId, flags)) { + return; + } + TVector<NThreading::TFuture<void>> currentRun = ExecRangeWithFutures(exec, firstId, lastId, flags); + for (auto& result : currentRun) { + result.GetValueSync(); // Exception will be rethrown if exists. If several exception - only the one with minimal id is rethrown. + } +} + +TVector<NThreading::TFuture<void>> +NPar::ILocalExecutor::ExecRangeWithFutures(TLocallyExecutableFunction exec, int firstId, int lastId, int flags) { + TFunctionWrapperWithPromise* execWrapper = new TFunctionWrapperWithPromise(exec, firstId, lastId); + TVector<NThreading::TFuture<void>> out = execWrapper->GetFutures(); + ExecRange(execWrapper, firstId, lastId, flags); + return out; +} + +void NPar::TLocalExecutor::ClearLPQueue() { + for (bool cont = true; cont;) { + cont = false; + TSingleJob job; + while (Impl_->LowJobQueue.Dequeue(&job)) { + AtomicAdd(Impl_->LPQueueSize, -1); + cont = true; + } + while (Impl_->MedJobQueue.Dequeue(&job)) { + AtomicAdd(Impl_->MPQueueSize, -1); + cont = true; + } + } +} + +int NPar::TLocalExecutor::GetQueueSize() const noexcept { + return AtomicGet(Impl_->QueueSize); +} + +int NPar::TLocalExecutor::GetMPQueueSize() const noexcept { + return AtomicGet(Impl_->MPQueueSize); +} + +int NPar::TLocalExecutor::GetLPQueueSize() const noexcept { + return AtomicGet(Impl_->LPQueueSize); +} + +int NPar::TLocalExecutor::GetWorkerThreadId() const noexcept { + return Impl_->WorkerThreadId; +} + +int NPar::TLocalExecutor::GetThreadCount() const noexcept { + return AtomicGet(Impl_->ThreadCount); +} + +////////////////////////////////////////////////////////////////////////// |