diff options
Diffstat (limited to 'library/cpp/threading/local_executor/local_executor.cpp')
| -rw-r--r-- | library/cpp/threading/local_executor/local_executor.cpp | 70 |
1 files changed, 35 insertions, 35 deletions
diff --git a/library/cpp/threading/local_executor/local_executor.cpp b/library/cpp/threading/local_executor/local_executor.cpp index af998300bff..95326cba334 100644 --- a/library/cpp/threading/local_executor/local_executor.cpp +++ b/library/cpp/threading/local_executor/local_executor.cpp @@ -3,13 +3,13 @@ #include <library/cpp/threading/future/future.h> #include <util/generic/utility.h> -#include <library/cpp/deprecated/atomic/atomic.h> #include <util/system/event.h> #include <util/system/thread.h> #include <util/system/tls.h> #include <util/system/yield.h> #include <util/thread/lfqueue.h> +#include <atomic> #include <utility> #ifdef _win_ @@ -84,17 +84,17 @@ namespace { class TLocalRangeExecutor: public NPar::ILocallyExecutable { TIntrusivePtr<NPar::ILocallyExecutable> Exec; - alignas(64) TAtomic Counter; - alignas(64) TAtomic WorkerCount; + alignas(64) std::atomic<int> Counter; + alignas(64) std::atomic<int> WorkerCount; int LastId; void LocalExec(int) override { - AtomicAdd(WorkerCount, 1); + ++WorkerCount; for (;;) { if (!DoSingleOp()) break; } - AtomicAdd(WorkerCount, -1); + --WorkerCount; } public: @@ -106,7 +106,7 @@ namespace { { } bool DoSingleOp() { - const int id = AtomicAdd(Counter, 1) - 1; + const int id = Counter.fetch_add(1); if (id >= LastId) return false; Exec->LocalExec(id); @@ -114,11 +114,11 @@ namespace { return true; } void WaitComplete() { - while (AtomicGet(WorkerCount) > 0) + while (WorkerCount.load() > 0) RegularYield(); } int GetRangeSize() const { - return Max<int>(LastId - Counter, 0); + return Max<int>(LastId - Counter.load(), 0); } }; @@ -132,11 +132,11 @@ public: TLockFreeQueue<TSingleJob> LowJobQueue; alignas(64) TSystemEvent HasJob; - TAtomic ThreadCount{0}; - alignas(64) TAtomic QueueSize{0}; - TAtomic MPQueueSize{0}; - TAtomic LPQueueSize{0}; - TAtomic ThreadId{0}; + std::atomic<int> ThreadCount{0}; + alignas(64) std::atomic<int> QueueSize{0}; + std::atomic<int> MPQueueSize{0}; + std::atomic<int> LPQueueSize{0}; + std::atomic<int> ThreadId{0}; Y_THREAD(int) CurrentTaskPriority; @@ -147,17 +147,17 @@ public: bool GetJob(TSingleJob* job); void RunNewThread(); void LaunchRange(TIntrusivePtr<TLocalRangeExecutor> execRange, int queueSizeLimit, - TAtomic* queueSize, TLockFreeQueue<TSingleJob>* jobQueue); + std::atomic<int>* queueSize, TLockFreeQueue<TSingleJob>* jobQueue); TImpl() = default; ~TImpl(); }; NPar::TLocalExecutor::TImpl::~TImpl() { - AtomicAdd(QueueSize, 1); + ++QueueSize; JobQueue.Enqueue(TSingleJob(nullptr, 0)); HasJob.Signal(); - while (AtomicGet(ThreadCount)) { + while (ThreadCount.load()) { ThreadYield(); } } @@ -167,7 +167,7 @@ void* NPar::TLocalExecutor::TImpl::HostWorkerThread(void* p) { auto* const ctx = (TImpl*)p; TThread::SetCurrentThreadName("ParLocalExecutor"); - ctx->WorkerThreadId = AtomicAdd(ctx->ThreadId, 1); + ctx->WorkerThreadId = ctx->ThreadId.fetch_add(1) + 1; for (bool cont = true; cont;) { TSingleJob job; bool gotJob = false; @@ -188,35 +188,35 @@ void* NPar::TLocalExecutor::TImpl::HostWorkerThread(void* p) { job.Exec->LocalExec(job.Id); RegularYield(); } else { - AtomicAdd(ctx->QueueSize, 1); + ++ctx->QueueSize; ctx->JobQueue.Enqueue(job); ctx->HasJob.Signal(); cont = false; } } - AtomicAdd(ctx->ThreadCount, -1); + --ctx->ThreadCount; return nullptr; } bool NPar::TLocalExecutor::TImpl::GetJob(TSingleJob* job) { if (JobQueue.Dequeue(job)) { CurrentTaskPriority = TLocalExecutor::HIGH_PRIORITY; - AtomicAdd(QueueSize, -1); + --QueueSize; return true; } else if (MedJobQueue.Dequeue(job)) { CurrentTaskPriority = TLocalExecutor::MED_PRIORITY; - AtomicAdd(MPQueueSize, -1); + --MPQueueSize; return true; } else if (LowJobQueue.Dequeue(job)) { CurrentTaskPriority = TLocalExecutor::LOW_PRIORITY; - AtomicAdd(LPQueueSize, -1); + --LPQueueSize; return true; } return false; } void NPar::TLocalExecutor::TImpl::RunNewThread() { - AtomicAdd(ThreadCount, 1); + ++ThreadCount; TThread thr(HostWorkerThread, this); thr.Start(); thr.Detach(); @@ -224,13 +224,13 @@ void NPar::TLocalExecutor::TImpl::RunNewThread() { void NPar::TLocalExecutor::TImpl::LaunchRange(TIntrusivePtr<TLocalRangeExecutor> rangeExec, int queueSizeLimit, - TAtomic* queueSize, + std::atomic<int>* queueSize, TLockFreeQueue<TSingleJob>* jobQueue) { int count = Min<int>(ThreadCount + 1, rangeExec->GetRangeSize()); - if (queueSizeLimit >= 0 && AtomicGet(*queueSize) >= queueSizeLimit) { + if (queueSizeLimit >= 0 && queueSize->load() >= queueSizeLimit) { return; } - AtomicAdd(*queueSize, count); + queueSize->fetch_add(count); jobQueue->EnqueueAll(TVector<TSingleJob>{size_t(count), TSingleJob(rangeExec, 0)}); HasJob.Signal(); } @@ -251,15 +251,15 @@ void NPar::TLocalExecutor::Exec(TIntrusivePtr<ILocallyExecutable> exec, int id, int prior = Max<int>(Impl_->CurrentTaskPriority, flags & PRIORITY_MASK); switch (prior) { case HIGH_PRIORITY: - AtomicAdd(Impl_->QueueSize, 1); + ++Impl_->QueueSize; Impl_->JobQueue.Enqueue(TSingleJob(std::move(exec), id)); break; case MED_PRIORITY: - AtomicAdd(Impl_->MPQueueSize, 1); + ++Impl_->MPQueueSize; Impl_->MedJobQueue.Enqueue(TSingleJob(std::move(exec), id)); break; case LOW_PRIORITY: - AtomicAdd(Impl_->LPQueueSize, 1); + ++Impl_->LPQueueSize; Impl_->LowJobQueue.Enqueue(TSingleJob(std::move(exec), id)); break; default: @@ -336,26 +336,26 @@ void NPar::TLocalExecutor::ClearLPQueue() { cont = false; TSingleJob job; while (Impl_->LowJobQueue.Dequeue(&job)) { - AtomicAdd(Impl_->LPQueueSize, -1); + --Impl_->LPQueueSize; cont = true; } while (Impl_->MedJobQueue.Dequeue(&job)) { - AtomicAdd(Impl_->MPQueueSize, -1); + --Impl_->MPQueueSize; cont = true; } } } int NPar::TLocalExecutor::GetQueueSize() const noexcept { - return AtomicGet(Impl_->QueueSize); + return Impl_->QueueSize.load(); } int NPar::TLocalExecutor::GetMPQueueSize() const noexcept { - return AtomicGet(Impl_->MPQueueSize); + return Impl_->MPQueueSize.load(); } int NPar::TLocalExecutor::GetLPQueueSize() const noexcept { - return AtomicGet(Impl_->LPQueueSize); + return Impl_->LPQueueSize.load(); } int NPar::TLocalExecutor::GetWorkerThreadId() const noexcept { @@ -363,7 +363,7 @@ int NPar::TLocalExecutor::GetWorkerThreadId() const noexcept { } int NPar::TLocalExecutor::GetThreadCount() const noexcept { - return AtomicGet(Impl_->ThreadCount); + return Impl_->ThreadCount.load(); } ////////////////////////////////////////////////////////////////////////// |
