diff options
author | Evgueni Petrov <evgueni.s.petrov@gmail.com> | 2022-02-10 16:46:59 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:46:59 +0300 |
commit | 19d7d7947f95423df4b50d3a6e858cd689db06ed (patch) | |
tree | b31e808f1761fed50236a1ae5bda03cd3bb9d428 /library/cpp/threading/local_executor/local_executor.cpp | |
parent | 122c87ebe5797d52f1800afb02c87f751135d0a4 (diff) | |
download | ydb-19d7d7947f95423df4b50d3a6e858cd689db06ed.tar.gz |
Restoring authorship annotation for Evgueni Petrov <evgueni.s.petrov@gmail.com>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/threading/local_executor/local_executor.cpp')
-rw-r--r-- | library/cpp/threading/local_executor/local_executor.cpp | 36 |
1 files changed, 18 insertions, 18 deletions
diff --git a/library/cpp/threading/local_executor/local_executor.cpp b/library/cpp/threading/local_executor/local_executor.cpp index 1d3fbb4bf4..17f4fc636b 100644 --- a/library/cpp/threading/local_executor/local_executor.cpp +++ b/library/cpp/threading/local_executor/local_executor.cpp @@ -84,8 +84,8 @@ namespace { class TLocalRangeExecutor: public NPar::ILocallyExecutable { TIntrusivePtr<NPar::ILocallyExecutable> Exec; - alignas(64) TAtomic Counter; - alignas(64) TAtomic WorkerCount; + alignas(64) TAtomic Counter; + alignas(64) TAtomic WorkerCount; int LastId; void LocalExec(int) override { @@ -106,7 +106,7 @@ namespace { { } bool DoSingleOp() { - const int id = AtomicAdd(Counter, 1) - 1; + const int id = AtomicAdd(Counter, 1) - 1; if (id >= LastId) return false; Exec->LocalExec(id); @@ -116,7 +116,7 @@ namespace { void WaitComplete() { while (AtomicGet(WorkerCount) > 0) RegularYield(); - } + } int GetRangeSize() const { return Max<int>(LastId - Counter, 0); } @@ -130,10 +130,10 @@ public: TLockFreeQueue<TSingleJob> JobQueue; TLockFreeQueue<TSingleJob> MedJobQueue; TLockFreeQueue<TSingleJob> LowJobQueue; - alignas(64) TSystemEvent HasJob; + alignas(64) TSystemEvent HasJob; TAtomic ThreadCount{0}; - alignas(64) TAtomic QueueSize{0}; + alignas(64) TAtomic QueueSize{0}; TAtomic MPQueueSize{0}; TAtomic LPQueueSize{0}; TAtomic ThreadId{0}; @@ -231,7 +231,7 @@ void NPar::TLocalExecutor::TImpl::LaunchRange(TIntrusivePtr<TLocalRangeExecutor> return; } AtomicAdd(*queueSize, count); - jobQueue->EnqueueAll(TVector<TSingleJob>{size_t(count), TSingleJob(rangeExec, 0)}); + jobQueue->EnqueueAll(TVector<TSingleJob>{size_t(count), TSingleJob(rangeExec, 0)}); HasJob.Signal(); } @@ -269,13 +269,13 @@ void NPar::TLocalExecutor::Exec(TIntrusivePtr<ILocallyExecutable> exec, int id, Impl_->HasJob.Signal(); } -void NPar::ILocalExecutor::Exec(TLocallyExecutableFunction exec, int id, int flags) { +void NPar::ILocalExecutor::Exec(TLocallyExecutableFunction exec, int id, int flags) { Exec(new TFunctionWrapper(std::move(exec)), id, flags); } void NPar::TLocalExecutor::ExecRange(TIntrusivePtr<ILocallyExecutable> exec, int firstId, int lastId, int flags) { Y_ASSERT(lastId >= firstId); - if (TryExecRangeSequentially([=] (int id) { exec->LocalExec(id); }, firstId, lastId, flags)) { + if (TryExecRangeSequentially([=] (int id) { exec->LocalExec(id); }, firstId, lastId, flags)) { return; } auto rangeExec = MakeIntrusive<TLocalRangeExecutor>(std::move(exec), firstId, lastId); @@ -305,18 +305,18 @@ void NPar::TLocalExecutor::ExecRange(TIntrusivePtr<ILocallyExecutable> exec, int } } -void NPar::ILocalExecutor::ExecRange(TLocallyExecutableFunction exec, int firstId, int lastId, int flags) { - if (TryExecRangeSequentially(exec, firstId, lastId, flags)) { - return; - } +void NPar::ILocalExecutor::ExecRange(TLocallyExecutableFunction exec, int firstId, int lastId, int flags) { + if (TryExecRangeSequentially(exec, firstId, lastId, flags)) { + return; + } ExecRange(new TFunctionWrapper(exec), firstId, lastId, flags); } -void NPar::ILocalExecutor::ExecRangeWithThrow(TLocallyExecutableFunction exec, int firstId, int lastId, int flags) { +void NPar::ILocalExecutor::ExecRangeWithThrow(TLocallyExecutableFunction exec, int firstId, int lastId, int flags) { Y_VERIFY((flags & WAIT_COMPLETE) != 0, "ExecRangeWithThrow() requires WAIT_COMPLETE to wait if exceptions arise."); - if (TryExecRangeSequentially(exec, firstId, lastId, flags)) { - return; - } + if (TryExecRangeSequentially(exec, firstId, lastId, flags)) { + return; + } TVector<NThreading::TFuture<void>> currentRun = ExecRangeWithFutures(exec, firstId, lastId, flags); for (auto& result : currentRun) { result.GetValueSync(); // Exception will be rethrown if exists. If several exception - only the one with minimal id is rethrown. @@ -324,7 +324,7 @@ void NPar::ILocalExecutor::ExecRangeWithThrow(TLocallyExecutableFunction exec, i } TVector<NThreading::TFuture<void>> -NPar::ILocalExecutor::ExecRangeWithFutures(TLocallyExecutableFunction exec, int firstId, int lastId, int flags) { +NPar::ILocalExecutor::ExecRangeWithFutures(TLocallyExecutableFunction exec, int firstId, int lastId, int flags) { TFunctionWrapperWithPromise* execWrapper = new TFunctionWrapperWithPromise(exec, firstId, lastId); TVector<NThreading::TFuture<void>> out = execWrapper->GetFutures(); ExecRange(execWrapper, firstId, lastId, flags); |