From a727378fc47b04f22871cd625792f9347335989a Mon Sep 17 00:00:00 2001 From: tobo Date: Fri, 23 Jan 2026 20:14:10 +0300 Subject: TAtomic => std::atomic in library/cpp/threading/local_executor commit_hash:1e6b73c17aaaf69b19ff38a4b20246c45aeb3266 --- .../threading/local_executor/local_executor.cpp | 70 +++++++++++----------- 1 file changed, 35 insertions(+), 35 deletions(-) (limited to 'library/cpp/threading/local_executor/local_executor.cpp') diff --git a/library/cpp/threading/local_executor/local_executor.cpp b/library/cpp/threading/local_executor/local_executor.cpp index af998300bff..95326cba334 100644 --- a/library/cpp/threading/local_executor/local_executor.cpp +++ b/library/cpp/threading/local_executor/local_executor.cpp @@ -3,13 +3,13 @@ #include #include -#include #include #include #include #include #include +#include #include #ifdef _win_ @@ -84,17 +84,17 @@ namespace { class TLocalRangeExecutor: public NPar::ILocallyExecutable { TIntrusivePtr Exec; - alignas(64) TAtomic Counter; - alignas(64) TAtomic WorkerCount; + alignas(64) std::atomic Counter; + alignas(64) std::atomic WorkerCount; int LastId; void LocalExec(int) override { - AtomicAdd(WorkerCount, 1); + ++WorkerCount; for (;;) { if (!DoSingleOp()) break; } - AtomicAdd(WorkerCount, -1); + --WorkerCount; } public: @@ -106,7 +106,7 @@ namespace { { } bool DoSingleOp() { - const int id = AtomicAdd(Counter, 1) - 1; + const int id = Counter.fetch_add(1); if (id >= LastId) return false; Exec->LocalExec(id); @@ -114,11 +114,11 @@ namespace { return true; } void WaitComplete() { - while (AtomicGet(WorkerCount) > 0) + while (WorkerCount.load() > 0) RegularYield(); } int GetRangeSize() const { - return Max(LastId - Counter, 0); + return Max(LastId - Counter.load(), 0); } }; @@ -132,11 +132,11 @@ public: TLockFreeQueue LowJobQueue; alignas(64) TSystemEvent HasJob; - TAtomic ThreadCount{0}; - alignas(64) TAtomic QueueSize{0}; - TAtomic MPQueueSize{0}; - TAtomic LPQueueSize{0}; - TAtomic ThreadId{0}; + std::atomic ThreadCount{0}; + alignas(64) std::atomic QueueSize{0}; + std::atomic MPQueueSize{0}; + std::atomic LPQueueSize{0}; + std::atomic ThreadId{0}; Y_THREAD(int) CurrentTaskPriority; @@ -147,17 +147,17 @@ public: bool GetJob(TSingleJob* job); void RunNewThread(); void LaunchRange(TIntrusivePtr execRange, int queueSizeLimit, - TAtomic* queueSize, TLockFreeQueue* jobQueue); + std::atomic* queueSize, TLockFreeQueue* jobQueue); TImpl() = default; ~TImpl(); }; NPar::TLocalExecutor::TImpl::~TImpl() { - AtomicAdd(QueueSize, 1); + ++QueueSize; JobQueue.Enqueue(TSingleJob(nullptr, 0)); HasJob.Signal(); - while (AtomicGet(ThreadCount)) { + while (ThreadCount.load()) { ThreadYield(); } } @@ -167,7 +167,7 @@ void* NPar::TLocalExecutor::TImpl::HostWorkerThread(void* p) { auto* const ctx = (TImpl*)p; TThread::SetCurrentThreadName("ParLocalExecutor"); - ctx->WorkerThreadId = AtomicAdd(ctx->ThreadId, 1); + ctx->WorkerThreadId = ctx->ThreadId.fetch_add(1) + 1; for (bool cont = true; cont;) { TSingleJob job; bool gotJob = false; @@ -188,35 +188,35 @@ void* NPar::TLocalExecutor::TImpl::HostWorkerThread(void* p) { job.Exec->LocalExec(job.Id); RegularYield(); } else { - AtomicAdd(ctx->QueueSize, 1); + ++ctx->QueueSize; ctx->JobQueue.Enqueue(job); ctx->HasJob.Signal(); cont = false; } } - AtomicAdd(ctx->ThreadCount, -1); + --ctx->ThreadCount; return nullptr; } bool NPar::TLocalExecutor::TImpl::GetJob(TSingleJob* job) { if (JobQueue.Dequeue(job)) { CurrentTaskPriority = TLocalExecutor::HIGH_PRIORITY; - AtomicAdd(QueueSize, -1); + --QueueSize; return true; } else if (MedJobQueue.Dequeue(job)) { CurrentTaskPriority = TLocalExecutor::MED_PRIORITY; - AtomicAdd(MPQueueSize, -1); + --MPQueueSize; return true; } else if (LowJobQueue.Dequeue(job)) { CurrentTaskPriority = TLocalExecutor::LOW_PRIORITY; - AtomicAdd(LPQueueSize, -1); + --LPQueueSize; return true; } return false; } void NPar::TLocalExecutor::TImpl::RunNewThread() { - AtomicAdd(ThreadCount, 1); + ++ThreadCount; TThread thr(HostWorkerThread, this); thr.Start(); thr.Detach(); @@ -224,13 +224,13 @@ void NPar::TLocalExecutor::TImpl::RunNewThread() { void NPar::TLocalExecutor::TImpl::LaunchRange(TIntrusivePtr rangeExec, int queueSizeLimit, - TAtomic* queueSize, + std::atomic* queueSize, TLockFreeQueue* jobQueue) { int count = Min(ThreadCount + 1, rangeExec->GetRangeSize()); - if (queueSizeLimit >= 0 && AtomicGet(*queueSize) >= queueSizeLimit) { + if (queueSizeLimit >= 0 && queueSize->load() >= queueSizeLimit) { return; } - AtomicAdd(*queueSize, count); + queueSize->fetch_add(count); jobQueue->EnqueueAll(TVector{size_t(count), TSingleJob(rangeExec, 0)}); HasJob.Signal(); } @@ -251,15 +251,15 @@ void NPar::TLocalExecutor::Exec(TIntrusivePtr exec, int id, int prior = Max(Impl_->CurrentTaskPriority, flags & PRIORITY_MASK); switch (prior) { case HIGH_PRIORITY: - AtomicAdd(Impl_->QueueSize, 1); + ++Impl_->QueueSize; Impl_->JobQueue.Enqueue(TSingleJob(std::move(exec), id)); break; case MED_PRIORITY: - AtomicAdd(Impl_->MPQueueSize, 1); + ++Impl_->MPQueueSize; Impl_->MedJobQueue.Enqueue(TSingleJob(std::move(exec), id)); break; case LOW_PRIORITY: - AtomicAdd(Impl_->LPQueueSize, 1); + ++Impl_->LPQueueSize; Impl_->LowJobQueue.Enqueue(TSingleJob(std::move(exec), id)); break; default: @@ -336,26 +336,26 @@ void NPar::TLocalExecutor::ClearLPQueue() { cont = false; TSingleJob job; while (Impl_->LowJobQueue.Dequeue(&job)) { - AtomicAdd(Impl_->LPQueueSize, -1); + --Impl_->LPQueueSize; cont = true; } while (Impl_->MedJobQueue.Dequeue(&job)) { - AtomicAdd(Impl_->MPQueueSize, -1); + --Impl_->MPQueueSize; cont = true; } } } int NPar::TLocalExecutor::GetQueueSize() const noexcept { - return AtomicGet(Impl_->QueueSize); + return Impl_->QueueSize.load(); } int NPar::TLocalExecutor::GetMPQueueSize() const noexcept { - return AtomicGet(Impl_->MPQueueSize); + return Impl_->MPQueueSize.load(); } int NPar::TLocalExecutor::GetLPQueueSize() const noexcept { - return AtomicGet(Impl_->LPQueueSize); + return Impl_->LPQueueSize.load(); } int NPar::TLocalExecutor::GetWorkerThreadId() const noexcept { @@ -363,7 +363,7 @@ int NPar::TLocalExecutor::GetWorkerThreadId() const noexcept { } int NPar::TLocalExecutor::GetThreadCount() const noexcept { - return AtomicGet(Impl_->ThreadCount); + return Impl_->ThreadCount.load(); } ////////////////////////////////////////////////////////////////////////// -- cgit v1.3