aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/threading/local_executor
diff options
context:
space:
mode:
authoryazevnul <yazevnul@yandex-team.ru>2022-02-10 16:46:46 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:46:46 +0300
commit8cbc307de0221f84c80c42dcbe07d40727537e2c (patch)
tree625d5a673015d1df891e051033e9fcde5c7be4e5 /library/cpp/threading/local_executor
parent30d1ef3941e0dc835be7609de5ebee66958f215a (diff)
downloadydb-8cbc307de0221f84c80c42dcbe07d40727537e2c.tar.gz
Restoring authorship annotation for <yazevnul@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/threading/local_executor')
-rw-r--r--library/cpp/threading/local_executor/local_executor.cpp568
-rw-r--r--library/cpp/threading/local_executor/local_executor.h92
-rw-r--r--library/cpp/threading/local_executor/ut/local_executor_ut.cpp56
-rw-r--r--library/cpp/threading/local_executor/ut/ya.make6
-rw-r--r--library/cpp/threading/local_executor/ya.make4
5 files changed, 363 insertions, 363 deletions
diff --git a/library/cpp/threading/local_executor/local_executor.cpp b/library/cpp/threading/local_executor/local_executor.cpp
index 1d3fbb4bf4..6e62d09d85 100644
--- a/library/cpp/threading/local_executor/local_executor.cpp
+++ b/library/cpp/threading/local_executor/local_executor.cpp
@@ -1,17 +1,17 @@
#include "local_executor.h"
#include <library/cpp/threading/future/future.h>
-
-#include <util/generic/utility.h>
-#include <util/system/atomic.h>
-#include <util/system/event.h>
+
+#include <util/generic/utility.h>
+#include <util/system/atomic.h>
+#include <util/system/event.h>
#include <util/system/thread.h>
-#include <util/system/tls.h>
+#include <util/system/tls.h>
#include <util/system/yield.h>
-#include <util/thread/lfqueue.h>
-
-#include <utility>
+#include <util/thread/lfqueue.h>
+#include <utility>
+
#ifdef _win_
static void RegularYield() {
}
@@ -23,11 +23,11 @@ static void RegularYield() {
}
#endif
-namespace {
- struct TFunctionWrapper : NPar::ILocallyExecutable {
- NPar::TLocallyExecutableFunction Exec;
- TFunctionWrapper(NPar::TLocallyExecutableFunction exec)
- : Exec(std::move(exec))
+namespace {
+ struct TFunctionWrapper : NPar::ILocallyExecutable {
+ NPar::TLocallyExecutableFunction Exec;
+ TFunctionWrapper(NPar::TLocallyExecutableFunction exec)
+ : Exec(std::move(exec))
{
}
void LocalExec(int id) override {
@@ -35,15 +35,15 @@ namespace {
}
};
- class TFunctionWrapperWithPromise: public NPar::ILocallyExecutable {
+ class TFunctionWrapperWithPromise: public NPar::ILocallyExecutable {
private:
- NPar::TLocallyExecutableFunction Exec;
+ NPar::TLocallyExecutableFunction Exec;
int FirstId, LastId;
TVector<NThreading::TPromise<void>> Promises;
public:
- TFunctionWrapperWithPromise(NPar::TLocallyExecutableFunction exec, int firstId, int lastId)
- : Exec(std::move(exec))
+ TFunctionWrapperWithPromise(NPar::TLocallyExecutableFunction exec, int firstId, int lastId)
+ : Exec(std::move(exec))
, FirstId(firstId)
, LastId(lastId)
{
@@ -70,300 +70,300 @@ namespace {
}
};
- struct TSingleJob {
- TIntrusivePtr<NPar::ILocallyExecutable> Exec;
- int Id{0};
+ struct TSingleJob {
+ TIntrusivePtr<NPar::ILocallyExecutable> Exec;
+ int Id{0};
- TSingleJob() = default;
- TSingleJob(TIntrusivePtr<NPar::ILocallyExecutable> exec, int id)
- : Exec(std::move(exec))
- , Id(id)
- {
+ TSingleJob() = default;
+ TSingleJob(TIntrusivePtr<NPar::ILocallyExecutable> exec, int id)
+ : Exec(std::move(exec))
+ , Id(id)
+ {
}
- };
+ };
- class TLocalRangeExecutor: public NPar::ILocallyExecutable {
- TIntrusivePtr<NPar::ILocallyExecutable> Exec;
+ class TLocalRangeExecutor: public NPar::ILocallyExecutable {
+ TIntrusivePtr<NPar::ILocallyExecutable> Exec;
alignas(64) TAtomic Counter;
alignas(64) TAtomic WorkerCount;
- int LastId;
-
- void LocalExec(int) override {
- AtomicAdd(WorkerCount, 1);
- for (;;) {
- if (!DoSingleOp())
- break;
- }
- AtomicAdd(WorkerCount, -1);
+ int LastId;
+
+ void LocalExec(int) override {
+ AtomicAdd(WorkerCount, 1);
+ for (;;) {
+ if (!DoSingleOp())
+ break;
+ }
+ AtomicAdd(WorkerCount, -1);
}
- public:
- TLocalRangeExecutor(TIntrusivePtr<ILocallyExecutable> exec, int firstId, int lastId)
- : Exec(std::move(exec))
- , Counter(firstId)
- , WorkerCount(0)
- , LastId(lastId)
- {
+ public:
+ TLocalRangeExecutor(TIntrusivePtr<ILocallyExecutable> exec, int firstId, int lastId)
+ : Exec(std::move(exec))
+ , Counter(firstId)
+ , WorkerCount(0)
+ , LastId(lastId)
+ {
}
- bool DoSingleOp() {
+ bool DoSingleOp() {
const int id = AtomicAdd(Counter, 1) - 1;
- if (id >= LastId)
- return false;
- Exec->LocalExec(id);
- RegularYield();
- return true;
+ if (id >= LastId)
+ return false;
+ Exec->LocalExec(id);
+ RegularYield();
+ return true;
}
- void WaitComplete() {
- while (AtomicGet(WorkerCount) > 0)
- RegularYield();
+ void WaitComplete() {
+ while (AtomicGet(WorkerCount) > 0)
+ RegularYield();
}
- int GetRangeSize() const {
- return Max<int>(LastId - Counter, 0);
+ int GetRangeSize() const {
+ return Max<int>(LastId - Counter, 0);
}
- };
+ };
}
-
-//////////////////////////////////////////////////////////////////////////
-class NPar::TLocalExecutor::TImpl {
-public:
- TLockFreeQueue<TSingleJob> JobQueue;
- TLockFreeQueue<TSingleJob> MedJobQueue;
- TLockFreeQueue<TSingleJob> LowJobQueue;
+
+//////////////////////////////////////////////////////////////////////////
+class NPar::TLocalExecutor::TImpl {
+public:
+ TLockFreeQueue<TSingleJob> JobQueue;
+ TLockFreeQueue<TSingleJob> MedJobQueue;
+ TLockFreeQueue<TSingleJob> LowJobQueue;
alignas(64) TSystemEvent HasJob;
-
- TAtomic ThreadCount{0};
+
+ TAtomic ThreadCount{0};
alignas(64) TAtomic QueueSize{0};
- TAtomic MPQueueSize{0};
- TAtomic LPQueueSize{0};
- TAtomic ThreadId{0};
-
- Y_THREAD(int)
- CurrentTaskPriority;
- Y_THREAD(int)
- WorkerThreadId;
-
- static void* HostWorkerThread(void* p);
- bool GetJob(TSingleJob* job);
- void RunNewThread();
- void LaunchRange(TIntrusivePtr<TLocalRangeExecutor> execRange, int queueSizeLimit,
- TAtomic* queueSize, TLockFreeQueue<TSingleJob>* jobQueue);
-
- TImpl() = default;
- ~TImpl();
-};
-
-NPar::TLocalExecutor::TImpl::~TImpl() {
- AtomicAdd(QueueSize, 1);
- JobQueue.Enqueue(TSingleJob(nullptr, 0));
- HasJob.Signal();
- while (AtomicGet(ThreadCount)) {
- ThreadYield();
- }
-}
-
-void* NPar::TLocalExecutor::TImpl::HostWorkerThread(void* p) {
- static const int FAST_ITERATIONS = 200;
-
- auto* const ctx = (TImpl*)p;
+ TAtomic MPQueueSize{0};
+ TAtomic LPQueueSize{0};
+ TAtomic ThreadId{0};
+
+ Y_THREAD(int)
+ CurrentTaskPriority;
+ Y_THREAD(int)
+ WorkerThreadId;
+
+ static void* HostWorkerThread(void* p);
+ bool GetJob(TSingleJob* job);
+ void RunNewThread();
+ void LaunchRange(TIntrusivePtr<TLocalRangeExecutor> execRange, int queueSizeLimit,
+ TAtomic* queueSize, TLockFreeQueue<TSingleJob>* jobQueue);
+
+ TImpl() = default;
+ ~TImpl();
+};
+
+NPar::TLocalExecutor::TImpl::~TImpl() {
+ AtomicAdd(QueueSize, 1);
+ JobQueue.Enqueue(TSingleJob(nullptr, 0));
+ HasJob.Signal();
+ while (AtomicGet(ThreadCount)) {
+ ThreadYield();
+ }
+}
+
+void* NPar::TLocalExecutor::TImpl::HostWorkerThread(void* p) {
+ static const int FAST_ITERATIONS = 200;
+
+ auto* const ctx = (TImpl*)p;
TThread::SetCurrentThreadName("ParLocalExecutor");
- ctx->WorkerThreadId = AtomicAdd(ctx->ThreadId, 1);
- for (bool cont = true; cont;) {
- TSingleJob job;
- bool gotJob = false;
- for (int iter = 0; iter < FAST_ITERATIONS; ++iter) {
- if (ctx->GetJob(&job)) {
- gotJob = true;
- break;
- }
- }
- if (!gotJob) {
- ctx->HasJob.Reset();
- if (!ctx->GetJob(&job)) {
- ctx->HasJob.Wait();
- continue;
- }
- }
- if (job.Exec.Get()) {
- job.Exec->LocalExec(job.Id);
- RegularYield();
- } else {
- AtomicAdd(ctx->QueueSize, 1);
- ctx->JobQueue.Enqueue(job);
- ctx->HasJob.Signal();
- cont = false;
- }
- }
- AtomicAdd(ctx->ThreadCount, -1);
- return nullptr;
-}
-
-bool NPar::TLocalExecutor::TImpl::GetJob(TSingleJob* job) {
- if (JobQueue.Dequeue(job)) {
- CurrentTaskPriority = TLocalExecutor::HIGH_PRIORITY;
- AtomicAdd(QueueSize, -1);
- return true;
- } else if (MedJobQueue.Dequeue(job)) {
- CurrentTaskPriority = TLocalExecutor::MED_PRIORITY;
- AtomicAdd(MPQueueSize, -1);
- return true;
- } else if (LowJobQueue.Dequeue(job)) {
- CurrentTaskPriority = TLocalExecutor::LOW_PRIORITY;
- AtomicAdd(LPQueueSize, -1);
- return true;
- }
- return false;
-}
-
-void NPar::TLocalExecutor::TImpl::RunNewThread() {
- AtomicAdd(ThreadCount, 1);
- TThread thr(HostWorkerThread, this);
- thr.Start();
- thr.Detach();
-}
-
-void NPar::TLocalExecutor::TImpl::LaunchRange(TIntrusivePtr<TLocalRangeExecutor> rangeExec,
- int queueSizeLimit,
- TAtomic* queueSize,
- TLockFreeQueue<TSingleJob>* jobQueue) {
- int count = Min<int>(ThreadCount + 1, rangeExec->GetRangeSize());
- if (queueSizeLimit >= 0 && AtomicGet(*queueSize) >= queueSizeLimit) {
- return;
- }
- AtomicAdd(*queueSize, count);
+ ctx->WorkerThreadId = AtomicAdd(ctx->ThreadId, 1);
+ for (bool cont = true; cont;) {
+ TSingleJob job;
+ bool gotJob = false;
+ for (int iter = 0; iter < FAST_ITERATIONS; ++iter) {
+ if (ctx->GetJob(&job)) {
+ gotJob = true;
+ break;
+ }
+ }
+ if (!gotJob) {
+ ctx->HasJob.Reset();
+ if (!ctx->GetJob(&job)) {
+ ctx->HasJob.Wait();
+ continue;
+ }
+ }
+ if (job.Exec.Get()) {
+ job.Exec->LocalExec(job.Id);
+ RegularYield();
+ } else {
+ AtomicAdd(ctx->QueueSize, 1);
+ ctx->JobQueue.Enqueue(job);
+ ctx->HasJob.Signal();
+ cont = false;
+ }
+ }
+ AtomicAdd(ctx->ThreadCount, -1);
+ return nullptr;
+}
+
+bool NPar::TLocalExecutor::TImpl::GetJob(TSingleJob* job) {
+ if (JobQueue.Dequeue(job)) {
+ CurrentTaskPriority = TLocalExecutor::HIGH_PRIORITY;
+ AtomicAdd(QueueSize, -1);
+ return true;
+ } else if (MedJobQueue.Dequeue(job)) {
+ CurrentTaskPriority = TLocalExecutor::MED_PRIORITY;
+ AtomicAdd(MPQueueSize, -1);
+ return true;
+ } else if (LowJobQueue.Dequeue(job)) {
+ CurrentTaskPriority = TLocalExecutor::LOW_PRIORITY;
+ AtomicAdd(LPQueueSize, -1);
+ return true;
+ }
+ return false;
+}
+
+void NPar::TLocalExecutor::TImpl::RunNewThread() {
+ AtomicAdd(ThreadCount, 1);
+ TThread thr(HostWorkerThread, this);
+ thr.Start();
+ thr.Detach();
+}
+
+void NPar::TLocalExecutor::TImpl::LaunchRange(TIntrusivePtr<TLocalRangeExecutor> rangeExec,
+ int queueSizeLimit,
+ TAtomic* queueSize,
+ TLockFreeQueue<TSingleJob>* jobQueue) {
+ int count = Min<int>(ThreadCount + 1, rangeExec->GetRangeSize());
+ if (queueSizeLimit >= 0 && AtomicGet(*queueSize) >= queueSizeLimit) {
+ return;
+ }
+ AtomicAdd(*queueSize, count);
jobQueue->EnqueueAll(TVector<TSingleJob>{size_t(count), TSingleJob(rangeExec, 0)});
- HasJob.Signal();
-}
-
-NPar::TLocalExecutor::TLocalExecutor()
- : Impl_{MakeHolder<TImpl>()} {
-}
-
-NPar::TLocalExecutor::~TLocalExecutor() = default;
-
-void NPar::TLocalExecutor::RunAdditionalThreads(int threadCount) {
- for (int i = 0; i < threadCount; i++)
- Impl_->RunNewThread();
-}
-
-void NPar::TLocalExecutor::Exec(TIntrusivePtr<ILocallyExecutable> exec, int id, int flags) {
- Y_ASSERT((flags & WAIT_COMPLETE) == 0); // unsupported
- int prior = Max<int>(Impl_->CurrentTaskPriority, flags & PRIORITY_MASK);
- switch (prior) {
- case HIGH_PRIORITY:
- AtomicAdd(Impl_->QueueSize, 1);
- Impl_->JobQueue.Enqueue(TSingleJob(std::move(exec), id));
- break;
- case MED_PRIORITY:
- AtomicAdd(Impl_->MPQueueSize, 1);
- Impl_->MedJobQueue.Enqueue(TSingleJob(std::move(exec), id));
- break;
- case LOW_PRIORITY:
- AtomicAdd(Impl_->LPQueueSize, 1);
- Impl_->LowJobQueue.Enqueue(TSingleJob(std::move(exec), id));
- break;
- default:
- Y_ASSERT(0);
- break;
- }
- Impl_->HasJob.Signal();
-}
-
+ HasJob.Signal();
+}
+
+NPar::TLocalExecutor::TLocalExecutor()
+ : Impl_{MakeHolder<TImpl>()} {
+}
+
+NPar::TLocalExecutor::~TLocalExecutor() = default;
+
+void NPar::TLocalExecutor::RunAdditionalThreads(int threadCount) {
+ for (int i = 0; i < threadCount; i++)
+ Impl_->RunNewThread();
+}
+
+void NPar::TLocalExecutor::Exec(TIntrusivePtr<ILocallyExecutable> exec, int id, int flags) {
+ Y_ASSERT((flags & WAIT_COMPLETE) == 0); // unsupported
+ int prior = Max<int>(Impl_->CurrentTaskPriority, flags & PRIORITY_MASK);
+ switch (prior) {
+ case HIGH_PRIORITY:
+ AtomicAdd(Impl_->QueueSize, 1);
+ Impl_->JobQueue.Enqueue(TSingleJob(std::move(exec), id));
+ break;
+ case MED_PRIORITY:
+ AtomicAdd(Impl_->MPQueueSize, 1);
+ Impl_->MedJobQueue.Enqueue(TSingleJob(std::move(exec), id));
+ break;
+ case LOW_PRIORITY:
+ AtomicAdd(Impl_->LPQueueSize, 1);
+ Impl_->LowJobQueue.Enqueue(TSingleJob(std::move(exec), id));
+ break;
+ default:
+ Y_ASSERT(0);
+ break;
+ }
+ Impl_->HasJob.Signal();
+}
+
void NPar::ILocalExecutor::Exec(TLocallyExecutableFunction exec, int id, int flags) {
- Exec(new TFunctionWrapper(std::move(exec)), id, flags);
-}
-
-void NPar::TLocalExecutor::ExecRange(TIntrusivePtr<ILocallyExecutable> exec, int firstId, int lastId, int flags) {
- Y_ASSERT(lastId >= firstId);
+ Exec(new TFunctionWrapper(std::move(exec)), id, flags);
+}
+
+void NPar::TLocalExecutor::ExecRange(TIntrusivePtr<ILocallyExecutable> exec, int firstId, int lastId, int flags) {
+ Y_ASSERT(lastId >= firstId);
if (TryExecRangeSequentially([=] (int id) { exec->LocalExec(id); }, firstId, lastId, flags)) {
- return;
- }
- auto rangeExec = MakeIntrusive<TLocalRangeExecutor>(std::move(exec), firstId, lastId);
- int queueSizeLimit = (flags & WAIT_COMPLETE) ? 10000 : -1;
- int prior = Max<int>(Impl_->CurrentTaskPriority, flags & PRIORITY_MASK);
- switch (prior) {
- case HIGH_PRIORITY:
- Impl_->LaunchRange(rangeExec, queueSizeLimit, &Impl_->QueueSize, &Impl_->JobQueue);
- break;
- case MED_PRIORITY:
- Impl_->LaunchRange(rangeExec, queueSizeLimit, &Impl_->MPQueueSize, &Impl_->MedJobQueue);
- break;
- case LOW_PRIORITY:
- Impl_->LaunchRange(rangeExec, queueSizeLimit, &Impl_->LPQueueSize, &Impl_->LowJobQueue);
- break;
- default:
- Y_ASSERT(0);
- break;
- }
- if (flags & WAIT_COMPLETE) {
- int keepPrior = Impl_->CurrentTaskPriority;
- Impl_->CurrentTaskPriority = prior;
- while (rangeExec->DoSingleOp()) {
- }
- Impl_->CurrentTaskPriority = keepPrior;
- rangeExec->WaitComplete();
- }
-}
-
+ return;
+ }
+ auto rangeExec = MakeIntrusive<TLocalRangeExecutor>(std::move(exec), firstId, lastId);
+ int queueSizeLimit = (flags & WAIT_COMPLETE) ? 10000 : -1;
+ int prior = Max<int>(Impl_->CurrentTaskPriority, flags & PRIORITY_MASK);
+ switch (prior) {
+ case HIGH_PRIORITY:
+ Impl_->LaunchRange(rangeExec, queueSizeLimit, &Impl_->QueueSize, &Impl_->JobQueue);
+ break;
+ case MED_PRIORITY:
+ Impl_->LaunchRange(rangeExec, queueSizeLimit, &Impl_->MPQueueSize, &Impl_->MedJobQueue);
+ break;
+ case LOW_PRIORITY:
+ Impl_->LaunchRange(rangeExec, queueSizeLimit, &Impl_->LPQueueSize, &Impl_->LowJobQueue);
+ break;
+ default:
+ Y_ASSERT(0);
+ break;
+ }
+ if (flags & WAIT_COMPLETE) {
+ int keepPrior = Impl_->CurrentTaskPriority;
+ Impl_->CurrentTaskPriority = prior;
+ while (rangeExec->DoSingleOp()) {
+ }
+ Impl_->CurrentTaskPriority = keepPrior;
+ rangeExec->WaitComplete();
+ }
+}
+
void NPar::ILocalExecutor::ExecRange(TLocallyExecutableFunction exec, int firstId, int lastId, int flags) {
if (TryExecRangeSequentially(exec, firstId, lastId, flags)) {
return;
}
- ExecRange(new TFunctionWrapper(exec), firstId, lastId, flags);
-}
-
+ ExecRange(new TFunctionWrapper(exec), firstId, lastId, flags);
+}
+
void NPar::ILocalExecutor::ExecRangeWithThrow(TLocallyExecutableFunction exec, int firstId, int lastId, int flags) {
- Y_VERIFY((flags & WAIT_COMPLETE) != 0, "ExecRangeWithThrow() requires WAIT_COMPLETE to wait if exceptions arise.");
+ Y_VERIFY((flags & WAIT_COMPLETE) != 0, "ExecRangeWithThrow() requires WAIT_COMPLETE to wait if exceptions arise.");
if (TryExecRangeSequentially(exec, firstId, lastId, flags)) {
return;
}
- TVector<NThreading::TFuture<void>> currentRun = ExecRangeWithFutures(exec, firstId, lastId, flags);
- for (auto& result : currentRun) {
- result.GetValueSync(); // Exception will be rethrown if exists. If several exception - only the one with minimal id is rethrown.
- }
-}
-
-TVector<NThreading::TFuture<void>>
+ TVector<NThreading::TFuture<void>> currentRun = ExecRangeWithFutures(exec, firstId, lastId, flags);
+ for (auto& result : currentRun) {
+ result.GetValueSync(); // Exception will be rethrown if exists. If several exception - only the one with minimal id is rethrown.
+ }
+}
+
+TVector<NThreading::TFuture<void>>
NPar::ILocalExecutor::ExecRangeWithFutures(TLocallyExecutableFunction exec, int firstId, int lastId, int flags) {
- TFunctionWrapperWithPromise* execWrapper = new TFunctionWrapperWithPromise(exec, firstId, lastId);
- TVector<NThreading::TFuture<void>> out = execWrapper->GetFutures();
- ExecRange(execWrapper, firstId, lastId, flags);
- return out;
-}
-
-void NPar::TLocalExecutor::ClearLPQueue() {
- for (bool cont = true; cont;) {
- cont = false;
- TSingleJob job;
- while (Impl_->LowJobQueue.Dequeue(&job)) {
- AtomicAdd(Impl_->LPQueueSize, -1);
- cont = true;
- }
- while (Impl_->MedJobQueue.Dequeue(&job)) {
- AtomicAdd(Impl_->MPQueueSize, -1);
- cont = true;
- }
- }
-}
-
-int NPar::TLocalExecutor::GetQueueSize() const noexcept {
- return AtomicGet(Impl_->QueueSize);
-}
-
-int NPar::TLocalExecutor::GetMPQueueSize() const noexcept {
- return AtomicGet(Impl_->MPQueueSize);
-}
-
-int NPar::TLocalExecutor::GetLPQueueSize() const noexcept {
- return AtomicGet(Impl_->LPQueueSize);
-}
-
+ TFunctionWrapperWithPromise* execWrapper = new TFunctionWrapperWithPromise(exec, firstId, lastId);
+ TVector<NThreading::TFuture<void>> out = execWrapper->GetFutures();
+ ExecRange(execWrapper, firstId, lastId, flags);
+ return out;
+}
+
+void NPar::TLocalExecutor::ClearLPQueue() {
+ for (bool cont = true; cont;) {
+ cont = false;
+ TSingleJob job;
+ while (Impl_->LowJobQueue.Dequeue(&job)) {
+ AtomicAdd(Impl_->LPQueueSize, -1);
+ cont = true;
+ }
+ while (Impl_->MedJobQueue.Dequeue(&job)) {
+ AtomicAdd(Impl_->MPQueueSize, -1);
+ cont = true;
+ }
+ }
+}
+
+int NPar::TLocalExecutor::GetQueueSize() const noexcept {
+ return AtomicGet(Impl_->QueueSize);
+}
+
+int NPar::TLocalExecutor::GetMPQueueSize() const noexcept {
+ return AtomicGet(Impl_->MPQueueSize);
+}
+
+int NPar::TLocalExecutor::GetLPQueueSize() const noexcept {
+ return AtomicGet(Impl_->LPQueueSize);
+}
+
int NPar::TLocalExecutor::GetWorkerThreadId() const noexcept {
- return Impl_->WorkerThreadId;
-}
-
-int NPar::TLocalExecutor::GetThreadCount() const noexcept {
- return AtomicGet(Impl_->ThreadCount);
-}
-
-//////////////////////////////////////////////////////////////////////////
+ return Impl_->WorkerThreadId;
+}
+
+int NPar::TLocalExecutor::GetThreadCount() const noexcept {
+ return AtomicGet(Impl_->ThreadCount);
+}
+
+//////////////////////////////////////////////////////////////////////////
diff --git a/library/cpp/threading/local_executor/local_executor.h b/library/cpp/threading/local_executor/local_executor.h
index c1c824f67c..aa500d34d3 100644
--- a/library/cpp/threading/local_executor/local_executor.h
+++ b/library/cpp/threading/local_executor/local_executor.h
@@ -1,23 +1,23 @@
#pragma once
#include <library/cpp/threading/future/future.h>
-
+
#include <util/generic/cast.h>
-#include <util/generic/fwd.h>
-#include <util/generic/noncopyable.h>
+#include <util/generic/fwd.h>
+#include <util/generic/noncopyable.h>
#include <util/generic/ptr.h>
-#include <util/generic/singleton.h>
+#include <util/generic/singleton.h>
#include <util/generic/ymath.h>
-
+
#include <functional>
namespace NPar {
struct ILocallyExecutable : virtual public TThrRefBase {
- // Must be implemented by the end user to define job that will be processed by one of
- // executor threads.
- //
- // @param id Job parameter, typically an index pointing somewhere in array, or just
- // some dummy value, e.g. `0`.
+ // Must be implemented by the end user to define job that will be processed by one of
+ // executor threads.
+ //
+ // @param id Job parameter, typically an index pointing somewhere in array, or just
+ // some dummy value, e.g. `0`.
virtual void LocalExec(int id) = 0;
};
@@ -31,7 +31,7 @@ namespace NPar {
ILocalExecutor() = default;
virtual ~ILocalExecutor() = default;
- enum EFlags : int {
+ enum EFlags : int {
HIGH_PRIORITY = 0,
MED_PRIORITY = 1,
LOW_PRIORITY = 2,
@@ -58,8 +58,8 @@ namespace NPar {
virtual int GetWorkerThreadId() const noexcept = 0;
virtual int GetThreadCount() const noexcept = 0;
- // Describes a range of tasks with parameters from integer range [FirstId, LastId).
- //
+ // Describes a range of tasks with parameters from integer range [FirstId, LastId).
+ //
class TExecRangeParams {
public:
template <typename TFirst, typename TLast>
@@ -70,9 +70,9 @@ namespace NPar {
Y_ASSERT(LastId >= FirstId);
SetBlockSize(1);
}
- // Partition tasks into `blockCount` blocks of approximately equal size, each of which
- // will be executed as a separate bigger task.
- //
+ // Partition tasks into `blockCount` blocks of approximately equal size, each of which
+ // will be executed as a separate bigger task.
+ //
template <typename TBlockCount>
TExecRangeParams& SetBlockCount(TBlockCount blockCount) {
Y_ASSERT(SafeIntegerCast<int>(blockCount) > 0 || FirstId == LastId);
@@ -81,9 +81,9 @@ namespace NPar {
BlockEqualToThreads = false;
return *this;
}
- // Partition tasks into blocks of approximately `blockSize` size, each of which will
- // be executed as a separate bigger task.
- //
+ // Partition tasks into blocks of approximately `blockSize` size, each of which will
+ // be executed as a separate bigger task.
+ //
template <typename TBlockSize>
TExecRangeParams& SetBlockSize(TBlockSize blockSize) {
Y_ASSERT(SafeIntegerCast<int>(blockSize) > 0 || FirstId == LastId);
@@ -92,9 +92,9 @@ namespace NPar {
BlockEqualToThreads = false;
return *this;
}
- // Partition tasks into thread count blocks of approximately equal size, each of which
- // will be executed as a separate bigger task.
- //
+ // Partition tasks into thread count blocks of approximately equal size, each of which
+ // will be executed as a separate bigger task.
+ //
TExecRangeParams& SetBlockCountToThreadCount() {
BlockEqualToThreads = true;
return *this;
@@ -107,9 +107,9 @@ namespace NPar {
Y_ASSERT(!BlockEqualToThreads);
return BlockSize;
}
- bool GetBlockEqualToThreads() {
- return BlockEqualToThreads;
- }
+ bool GetBlockEqualToThreads() {
+ return BlockEqualToThreads;
+ }
const int FirstId = 0;
const int LastId = 0;
@@ -120,26 +120,26 @@ namespace NPar {
bool BlockEqualToThreads;
};
- // `Exec` and `ExecRange` versions that accept functions.
- //
- void Exec(TLocallyExecutableFunction exec, int id, int flags);
- void ExecRange(TLocallyExecutableFunction exec, int firstId, int lastId, int flags);
-
- // Version of `ExecRange` that throws exception from task with minimal id if at least one of
- // task threw an exception.
- //
- void ExecRangeWithThrow(TLocallyExecutableFunction exec, int firstId, int lastId, int flags);
-
- // Version of `ExecRange` that returns vector of futures, thus allowing to retry any task if
- // it fails.
- //
- TVector<NThreading::TFuture<void>> ExecRangeWithFutures(TLocallyExecutableFunction exec, int firstId, int lastId, int flags);
-
+ // `Exec` and `ExecRange` versions that accept functions.
+ //
+ void Exec(TLocallyExecutableFunction exec, int id, int flags);
+ void ExecRange(TLocallyExecutableFunction exec, int firstId, int lastId, int flags);
+
+ // Version of `ExecRange` that throws exception from task with minimal id if at least one of
+ // task threw an exception.
+ //
+ void ExecRangeWithThrow(TLocallyExecutableFunction exec, int firstId, int lastId, int flags);
+
+ // Version of `ExecRange` that returns vector of futures, thus allowing to retry any task if
+ // it fails.
+ //
+ TVector<NThreading::TFuture<void>> ExecRangeWithFutures(TLocallyExecutableFunction exec, int firstId, int lastId, int flags);
+
template <typename TBody>
static inline auto BlockedLoopBody(const TExecRangeParams& params, const TBody& body) {
return [=](int blockId) {
- const int blockFirstId = params.FirstId + blockId * params.GetBlockSize();
- const int blockLastId = Min(params.LastId, blockFirstId + params.GetBlockSize());
+ const int blockFirstId = params.FirstId + blockId * params.GetBlockSize();
+ const int blockLastId = Min(params.LastId, blockFirstId + params.GetBlockSize());
for (int i = blockFirstId; i < blockLastId; ++i) {
body(i);
}
@@ -151,10 +151,10 @@ namespace NPar {
if (TryExecRangeSequentially(body, params.FirstId, params.LastId, flags)) {
return;
}
- if (params.GetBlockEqualToThreads()) {
- params.SetBlockCount(GetThreadCount() + ((flags & WAIT_COMPLETE) != 0)); // ThreadCount or ThreadCount+1 depending on WaitFlag
+ if (params.GetBlockEqualToThreads()) {
+ params.SetBlockCount(GetThreadCount() + ((flags & WAIT_COMPLETE) != 0)); // ThreadCount or ThreadCount+1 depending on WaitFlag
}
- ExecRange(BlockedLoopBody(params, body), 0, params.GetBlockCount(), flags);
+ ExecRange(BlockedLoopBody(params, body), 0, params.GetBlockCount(), flags);
}
template <typename TBody>
@@ -269,7 +269,7 @@ namespace NPar {
THolder<TImpl> Impl_;
};
- static inline TLocalExecutor& LocalExecutor() {
+ static inline TLocalExecutor& LocalExecutor() {
return *Singleton<TLocalExecutor>();
}
diff --git a/library/cpp/threading/local_executor/ut/local_executor_ut.cpp b/library/cpp/threading/local_executor/ut/local_executor_ut.cpp
index ac5737717c..fe7dab0899 100644
--- a/library/cpp/threading/local_executor/ut/local_executor_ut.cpp
+++ b/library/cpp/threading/local_executor/ut/local_executor_ut.cpp
@@ -1,10 +1,10 @@
#include <library/cpp/threading/local_executor/local_executor.h>
#include <library/cpp/threading/future/future.h>
-
+
#include <library/cpp/testing/unittest/registar.h>
#include <util/system/mutex.h>
#include <util/system/rwlock.h>
-#include <util/generic/algorithm.h>
+#include <util/generic/algorithm.h>
using namespace NPar;
@@ -14,7 +14,7 @@ class TTestException: public yexception {
static const int DefaultThreadsCount = 41;
static const int DefaultRangeSize = 999;
-Y_UNIT_TEST_SUITE(ExecRangeWithFutures){
+Y_UNIT_TEST_SUITE(ExecRangeWithFutures){
bool AllOf(const TVector<int>& vec, int value){
return AllOf(vec, [value](int element) { return value == element; });
}
@@ -41,23 +41,23 @@ void AsyncRunAndWaitFuturesReady(int rangeSize, int threads) {
UNIT_ASSERT(AllOf(data, 1));
}
-Y_UNIT_TEST(AsyncRunRangeAndWaitFuturesReady) {
+Y_UNIT_TEST(AsyncRunRangeAndWaitFuturesReady) {
AsyncRunAndWaitFuturesReady(DefaultRangeSize, DefaultThreadsCount);
}
-Y_UNIT_TEST(AsyncRunOneTaskAndWaitFuturesReady) {
+Y_UNIT_TEST(AsyncRunOneTaskAndWaitFuturesReady) {
AsyncRunAndWaitFuturesReady(1, DefaultThreadsCount);
}
-Y_UNIT_TEST(AsyncRunRangeAndWaitFuturesReadyOneExtraThread) {
+Y_UNIT_TEST(AsyncRunRangeAndWaitFuturesReadyOneExtraThread) {
AsyncRunAndWaitFuturesReady(DefaultRangeSize, 1);
}
-Y_UNIT_TEST(AsyncRunOneThreadAndWaitFuturesReadyOneExtraThread) {
+Y_UNIT_TEST(AsyncRunOneThreadAndWaitFuturesReadyOneExtraThread) {
AsyncRunAndWaitFuturesReady(1, 1);
}
-Y_UNIT_TEST(AsyncRunTwoRangesAndWaitFuturesReady) {
+Y_UNIT_TEST(AsyncRunTwoRangesAndWaitFuturesReady) {
TLocalExecutor localExecutor;
localExecutor.RunAdditionalThreads(DefaultThreadsCount);
TAtomic signal = 0;
@@ -118,23 +118,23 @@ void AsyncRunRangeAndWaitExceptions(int rangeSize, int threadsCount) {
UNIT_ASSERT(AllOf(data, 1));
}
-Y_UNIT_TEST(AsyncRunRangeAndWaitExceptions) {
+Y_UNIT_TEST(AsyncRunRangeAndWaitExceptions) {
AsyncRunRangeAndWaitExceptions(DefaultRangeSize, DefaultThreadsCount);
}
-Y_UNIT_TEST(AsyncRunOneTaskAndWaitExceptions) {
+Y_UNIT_TEST(AsyncRunOneTaskAndWaitExceptions) {
AsyncRunRangeAndWaitExceptions(1, DefaultThreadsCount);
}
-Y_UNIT_TEST(AsyncRunRangeAndWaitExceptionsOneExtraThread) {
+Y_UNIT_TEST(AsyncRunRangeAndWaitExceptionsOneExtraThread) {
AsyncRunRangeAndWaitExceptions(DefaultRangeSize, 1);
}
-Y_UNIT_TEST(AsyncRunOneTaskAndWaitExceptionsOneExtraThread) {
+Y_UNIT_TEST(AsyncRunOneTaskAndWaitExceptionsOneExtraThread) {
AsyncRunRangeAndWaitExceptions(1, 1);
}
-Y_UNIT_TEST(AsyncRunTwoRangesAndWaitExceptions) {
+Y_UNIT_TEST(AsyncRunTwoRangesAndWaitExceptions) {
TLocalExecutor localExecutor;
localExecutor.RunAdditionalThreads(DefaultThreadsCount);
TAtomic signal = 0;
@@ -209,33 +209,33 @@ void RunRangeAndCheckExceptionsWithWaitComplete(int rangeSize, int threadsCount)
UNIT_ASSERT(AllOf(data, 1));
}
-Y_UNIT_TEST(RunRangeAndCheckExceptionsWithWaitComplete) {
+Y_UNIT_TEST(RunRangeAndCheckExceptionsWithWaitComplete) {
RunRangeAndCheckExceptionsWithWaitComplete(DefaultRangeSize, DefaultThreadsCount);
}
-Y_UNIT_TEST(RunOneAndCheckExceptionsWithWaitComplete) {
+Y_UNIT_TEST(RunOneAndCheckExceptionsWithWaitComplete) {
RunRangeAndCheckExceptionsWithWaitComplete(1, DefaultThreadsCount);
}
-Y_UNIT_TEST(RunRangeAndCheckExceptionsWithWaitCompleteOneExtraThread) {
+Y_UNIT_TEST(RunRangeAndCheckExceptionsWithWaitCompleteOneExtraThread) {
RunRangeAndCheckExceptionsWithWaitComplete(DefaultRangeSize, 1);
}
-Y_UNIT_TEST(RunOneAndCheckExceptionsWithWaitCompleteOneExtraThread) {
+Y_UNIT_TEST(RunOneAndCheckExceptionsWithWaitCompleteOneExtraThread) {
RunRangeAndCheckExceptionsWithWaitComplete(1, 1);
}
-Y_UNIT_TEST(RunRangeAndCheckExceptionsWithWaitCompleteZeroExtraThreads) {
+Y_UNIT_TEST(RunRangeAndCheckExceptionsWithWaitCompleteZeroExtraThreads) {
RunRangeAndCheckExceptionsWithWaitComplete(DefaultRangeSize, 0);
}
-Y_UNIT_TEST(RunOneAndCheckExceptionsWithWaitCompleteZeroExtraThreads) {
+Y_UNIT_TEST(RunOneAndCheckExceptionsWithWaitCompleteZeroExtraThreads) {
RunRangeAndCheckExceptionsWithWaitComplete(1, 0);
}
}
;
-Y_UNIT_TEST_SUITE(ExecRangeWithThrow){
+Y_UNIT_TEST_SUITE(ExecRangeWithThrow){
void RunParallelWhichThrowsTTestException(int rangeStart, int rangeSize, int threadsCount, int flags, TAtomic& processed){
AtomicSet(processed, 0);
TLocalExecutor localExecutor;
@@ -247,7 +247,7 @@ localExecutor.ExecRangeWithThrow([&processed](int) {
rangeStart, rangeStart + rangeSize, flags);
}
-Y_UNIT_TEST(RunParallelWhichThrowsTTestException) {
+Y_UNIT_TEST(RunParallelWhichThrowsTTestException) {
TAtomic processed = 0;
UNIT_ASSERT_EXCEPTION(
RunParallelWhichThrowsTTestException(10, 40, DefaultThreadsCount,
@@ -264,32 +264,32 @@ void ThrowAndCatchTTestException(int rangeSize, int threadsCount, int flags) {
UNIT_ASSERT(AtomicGet(processed) == rangeSize);
}
-Y_UNIT_TEST(ThrowAndCatchTTestExceptionLowPriority) {
+Y_UNIT_TEST(ThrowAndCatchTTestExceptionLowPriority) {
ThrowAndCatchTTestException(DefaultRangeSize, DefaultThreadsCount,
TLocalExecutor::EFlags::WAIT_COMPLETE | TLocalExecutor::EFlags::LOW_PRIORITY);
}
-Y_UNIT_TEST(ThrowAndCatchTTestExceptionMedPriority) {
+Y_UNIT_TEST(ThrowAndCatchTTestExceptionMedPriority) {
ThrowAndCatchTTestException(DefaultRangeSize, DefaultThreadsCount,
TLocalExecutor::EFlags::WAIT_COMPLETE | TLocalExecutor::EFlags::MED_PRIORITY);
}
-Y_UNIT_TEST(ThrowAndCatchTTestExceptionHighPriority) {
+Y_UNIT_TEST(ThrowAndCatchTTestExceptionHighPriority) {
ThrowAndCatchTTestException(DefaultRangeSize, DefaultThreadsCount,
TLocalExecutor::EFlags::WAIT_COMPLETE | TLocalExecutor::EFlags::HIGH_PRIORITY);
}
-Y_UNIT_TEST(ThrowAndCatchTTestExceptionWaitComplete) {
+Y_UNIT_TEST(ThrowAndCatchTTestExceptionWaitComplete) {
ThrowAndCatchTTestException(DefaultRangeSize, DefaultThreadsCount,
TLocalExecutor::EFlags::WAIT_COMPLETE);
}
-Y_UNIT_TEST(RethrowExeptionSequentialWaitComplete) {
+Y_UNIT_TEST(RethrowExeptionSequentialWaitComplete) {
ThrowAndCatchTTestException(DefaultRangeSize, 0,
TLocalExecutor::EFlags::WAIT_COMPLETE);
}
-Y_UNIT_TEST(RethrowExeptionOneExtraThreadWaitComplete) {
+Y_UNIT_TEST(RethrowExeptionOneExtraThreadWaitComplete) {
ThrowAndCatchTTestException(DefaultRangeSize, 1,
TLocalExecutor::EFlags::WAIT_COMPLETE);
}
@@ -314,7 +314,7 @@ void CatchTTestExceptionFromNested(TAtomic& processed1, TAtomic& processed2) {
0, DefaultRangeSize, TLocalExecutor::EFlags::WAIT_COMPLETE);
}
-Y_UNIT_TEST(NestedParallelExceptionsDoNotLeak) {
+Y_UNIT_TEST(NestedParallelExceptionsDoNotLeak) {
TAtomic processed1 = 0;
TAtomic processed2 = 0;
UNIT_ASSERT_NO_EXCEPTION(
diff --git a/library/cpp/threading/local_executor/ut/ya.make b/library/cpp/threading/local_executor/ut/ya.make
index be579a5ca0..2983c4f466 100644
--- a/library/cpp/threading/local_executor/ut/ya.make
+++ b/library/cpp/threading/local_executor/ut/ya.make
@@ -1,10 +1,10 @@
OWNER(
g:matrixnet
- gulin
-)
+ gulin
+)
UNITTEST_FOR(library/cpp/threading/local_executor)
-
+
SRCS(
local_executor_ut.cpp
)
diff --git a/library/cpp/threading/local_executor/ya.make b/library/cpp/threading/local_executor/ya.make
index df210f92bb..516be66703 100644
--- a/library/cpp/threading/local_executor/ya.make
+++ b/library/cpp/threading/local_executor/ya.make
@@ -5,8 +5,8 @@ OWNER(
espetrov
)
-LIBRARY()
-
+LIBRARY()
+
SRCS(
local_executor.cpp
tbb_local_executor.cpp