summaryrefslogtreecommitdiffstats
path: root/library/cpp/threading/local_executor/local_executor.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'library/cpp/threading/local_executor/local_executor.cpp')
-rw-r--r--library/cpp/threading/local_executor/local_executor.cpp70
1 files changed, 35 insertions, 35 deletions
diff --git a/library/cpp/threading/local_executor/local_executor.cpp b/library/cpp/threading/local_executor/local_executor.cpp
index af998300bff..95326cba334 100644
--- a/library/cpp/threading/local_executor/local_executor.cpp
+++ b/library/cpp/threading/local_executor/local_executor.cpp
@@ -3,13 +3,13 @@
#include <library/cpp/threading/future/future.h>
#include <util/generic/utility.h>
-#include <library/cpp/deprecated/atomic/atomic.h>
#include <util/system/event.h>
#include <util/system/thread.h>
#include <util/system/tls.h>
#include <util/system/yield.h>
#include <util/thread/lfqueue.h>
+#include <atomic>
#include <utility>
#ifdef _win_
@@ -84,17 +84,17 @@ namespace {
class TLocalRangeExecutor: public NPar::ILocallyExecutable {
TIntrusivePtr<NPar::ILocallyExecutable> Exec;
- alignas(64) TAtomic Counter;
- alignas(64) TAtomic WorkerCount;
+ alignas(64) std::atomic<int> Counter;
+ alignas(64) std::atomic<int> WorkerCount;
int LastId;
void LocalExec(int) override {
- AtomicAdd(WorkerCount, 1);
+ ++WorkerCount;
for (;;) {
if (!DoSingleOp())
break;
}
- AtomicAdd(WorkerCount, -1);
+ --WorkerCount;
}
public:
@@ -106,7 +106,7 @@ namespace {
{
}
bool DoSingleOp() {
- const int id = AtomicAdd(Counter, 1) - 1;
+ const int id = Counter.fetch_add(1);
if (id >= LastId)
return false;
Exec->LocalExec(id);
@@ -114,11 +114,11 @@ namespace {
return true;
}
void WaitComplete() {
- while (AtomicGet(WorkerCount) > 0)
+ while (WorkerCount.load() > 0)
RegularYield();
}
int GetRangeSize() const {
- return Max<int>(LastId - Counter, 0);
+ return Max<int>(LastId - Counter.load(), 0);
}
};
@@ -132,11 +132,11 @@ public:
TLockFreeQueue<TSingleJob> LowJobQueue;
alignas(64) TSystemEvent HasJob;
- TAtomic ThreadCount{0};
- alignas(64) TAtomic QueueSize{0};
- TAtomic MPQueueSize{0};
- TAtomic LPQueueSize{0};
- TAtomic ThreadId{0};
+ std::atomic<int> ThreadCount{0};
+ alignas(64) std::atomic<int> QueueSize{0};
+ std::atomic<int> MPQueueSize{0};
+ std::atomic<int> LPQueueSize{0};
+ std::atomic<int> ThreadId{0};
Y_THREAD(int)
CurrentTaskPriority;
@@ -147,17 +147,17 @@ public:
bool GetJob(TSingleJob* job);
void RunNewThread();
void LaunchRange(TIntrusivePtr<TLocalRangeExecutor> execRange, int queueSizeLimit,
- TAtomic* queueSize, TLockFreeQueue<TSingleJob>* jobQueue);
+ std::atomic<int>* queueSize, TLockFreeQueue<TSingleJob>* jobQueue);
TImpl() = default;
~TImpl();
};
NPar::TLocalExecutor::TImpl::~TImpl() {
- AtomicAdd(QueueSize, 1);
+ ++QueueSize;
JobQueue.Enqueue(TSingleJob(nullptr, 0));
HasJob.Signal();
- while (AtomicGet(ThreadCount)) {
+ while (ThreadCount.load()) {
ThreadYield();
}
}
@@ -167,7 +167,7 @@ void* NPar::TLocalExecutor::TImpl::HostWorkerThread(void* p) {
auto* const ctx = (TImpl*)p;
TThread::SetCurrentThreadName("ParLocalExecutor");
- ctx->WorkerThreadId = AtomicAdd(ctx->ThreadId, 1);
+ ctx->WorkerThreadId = ctx->ThreadId.fetch_add(1) + 1;
for (bool cont = true; cont;) {
TSingleJob job;
bool gotJob = false;
@@ -188,35 +188,35 @@ void* NPar::TLocalExecutor::TImpl::HostWorkerThread(void* p) {
job.Exec->LocalExec(job.Id);
RegularYield();
} else {
- AtomicAdd(ctx->QueueSize, 1);
+ ++ctx->QueueSize;
ctx->JobQueue.Enqueue(job);
ctx->HasJob.Signal();
cont = false;
}
}
- AtomicAdd(ctx->ThreadCount, -1);
+ --ctx->ThreadCount;
return nullptr;
}
bool NPar::TLocalExecutor::TImpl::GetJob(TSingleJob* job) {
if (JobQueue.Dequeue(job)) {
CurrentTaskPriority = TLocalExecutor::HIGH_PRIORITY;
- AtomicAdd(QueueSize, -1);
+ --QueueSize;
return true;
} else if (MedJobQueue.Dequeue(job)) {
CurrentTaskPriority = TLocalExecutor::MED_PRIORITY;
- AtomicAdd(MPQueueSize, -1);
+ --MPQueueSize;
return true;
} else if (LowJobQueue.Dequeue(job)) {
CurrentTaskPriority = TLocalExecutor::LOW_PRIORITY;
- AtomicAdd(LPQueueSize, -1);
+ --LPQueueSize;
return true;
}
return false;
}
void NPar::TLocalExecutor::TImpl::RunNewThread() {
- AtomicAdd(ThreadCount, 1);
+ ++ThreadCount;
TThread thr(HostWorkerThread, this);
thr.Start();
thr.Detach();
@@ -224,13 +224,13 @@ void NPar::TLocalExecutor::TImpl::RunNewThread() {
void NPar::TLocalExecutor::TImpl::LaunchRange(TIntrusivePtr<TLocalRangeExecutor> rangeExec,
int queueSizeLimit,
- TAtomic* queueSize,
+ std::atomic<int>* queueSize,
TLockFreeQueue<TSingleJob>* jobQueue) {
int count = Min<int>(ThreadCount + 1, rangeExec->GetRangeSize());
- if (queueSizeLimit >= 0 && AtomicGet(*queueSize) >= queueSizeLimit) {
+ if (queueSizeLimit >= 0 && queueSize->load() >= queueSizeLimit) {
return;
}
- AtomicAdd(*queueSize, count);
+ queueSize->fetch_add(count);
jobQueue->EnqueueAll(TVector<TSingleJob>{size_t(count), TSingleJob(rangeExec, 0)});
HasJob.Signal();
}
@@ -251,15 +251,15 @@ void NPar::TLocalExecutor::Exec(TIntrusivePtr<ILocallyExecutable> exec, int id,
int prior = Max<int>(Impl_->CurrentTaskPriority, flags & PRIORITY_MASK);
switch (prior) {
case HIGH_PRIORITY:
- AtomicAdd(Impl_->QueueSize, 1);
+ ++Impl_->QueueSize;
Impl_->JobQueue.Enqueue(TSingleJob(std::move(exec), id));
break;
case MED_PRIORITY:
- AtomicAdd(Impl_->MPQueueSize, 1);
+ ++Impl_->MPQueueSize;
Impl_->MedJobQueue.Enqueue(TSingleJob(std::move(exec), id));
break;
case LOW_PRIORITY:
- AtomicAdd(Impl_->LPQueueSize, 1);
+ ++Impl_->LPQueueSize;
Impl_->LowJobQueue.Enqueue(TSingleJob(std::move(exec), id));
break;
default:
@@ -336,26 +336,26 @@ void NPar::TLocalExecutor::ClearLPQueue() {
cont = false;
TSingleJob job;
while (Impl_->LowJobQueue.Dequeue(&job)) {
- AtomicAdd(Impl_->LPQueueSize, -1);
+ --Impl_->LPQueueSize;
cont = true;
}
while (Impl_->MedJobQueue.Dequeue(&job)) {
- AtomicAdd(Impl_->MPQueueSize, -1);
+ --Impl_->MPQueueSize;
cont = true;
}
}
}
int NPar::TLocalExecutor::GetQueueSize() const noexcept {
- return AtomicGet(Impl_->QueueSize);
+ return Impl_->QueueSize.load();
}
int NPar::TLocalExecutor::GetMPQueueSize() const noexcept {
- return AtomicGet(Impl_->MPQueueSize);
+ return Impl_->MPQueueSize.load();
}
int NPar::TLocalExecutor::GetLPQueueSize() const noexcept {
- return AtomicGet(Impl_->LPQueueSize);
+ return Impl_->LPQueueSize.load();
}
int NPar::TLocalExecutor::GetWorkerThreadId() const noexcept {
@@ -363,7 +363,7 @@ int NPar::TLocalExecutor::GetWorkerThreadId() const noexcept {
}
int NPar::TLocalExecutor::GetThreadCount() const noexcept {
- return AtomicGet(Impl_->ThreadCount);
+ return Impl_->ThreadCount.load();
}
//////////////////////////////////////////////////////////////////////////