aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp
diff options
context:
space:
mode:
authorEvgueni Petrov <evgueni.s.petrov@gmail.com>2022-02-10 16:46:59 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:46:59 +0300
commit19d7d7947f95423df4b50d3a6e858cd689db06ed (patch)
treeb31e808f1761fed50236a1ae5bda03cd3bb9d428 /library/cpp
parent122c87ebe5797d52f1800afb02c87f751135d0a4 (diff)
downloadydb-19d7d7947f95423df4b50d3a6e858cd689db06ed.tar.gz
Restoring authorship annotation for Evgueni Petrov <evgueni.s.petrov@gmail.com>. Commit 1 of 2.
Diffstat (limited to 'library/cpp')
-rw-r--r--library/cpp/threading/local_executor/local_executor.cpp36
-rw-r--r--library/cpp/threading/local_executor/local_executor.h302
2 files changed, 169 insertions, 169 deletions
diff --git a/library/cpp/threading/local_executor/local_executor.cpp b/library/cpp/threading/local_executor/local_executor.cpp
index 1d3fbb4bf4..17f4fc636b 100644
--- a/library/cpp/threading/local_executor/local_executor.cpp
+++ b/library/cpp/threading/local_executor/local_executor.cpp
@@ -84,8 +84,8 @@ namespace {
class TLocalRangeExecutor: public NPar::ILocallyExecutable {
TIntrusivePtr<NPar::ILocallyExecutable> Exec;
- alignas(64) TAtomic Counter;
- alignas(64) TAtomic WorkerCount;
+ alignas(64) TAtomic Counter;
+ alignas(64) TAtomic WorkerCount;
int LastId;
void LocalExec(int) override {
@@ -106,7 +106,7 @@ namespace {
{
}
bool DoSingleOp() {
- const int id = AtomicAdd(Counter, 1) - 1;
+ const int id = AtomicAdd(Counter, 1) - 1;
if (id >= LastId)
return false;
Exec->LocalExec(id);
@@ -116,7 +116,7 @@ namespace {
void WaitComplete() {
while (AtomicGet(WorkerCount) > 0)
RegularYield();
- }
+ }
int GetRangeSize() const {
return Max<int>(LastId - Counter, 0);
}
@@ -130,10 +130,10 @@ public:
TLockFreeQueue<TSingleJob> JobQueue;
TLockFreeQueue<TSingleJob> MedJobQueue;
TLockFreeQueue<TSingleJob> LowJobQueue;
- alignas(64) TSystemEvent HasJob;
+ alignas(64) TSystemEvent HasJob;
TAtomic ThreadCount{0};
- alignas(64) TAtomic QueueSize{0};
+ alignas(64) TAtomic QueueSize{0};
TAtomic MPQueueSize{0};
TAtomic LPQueueSize{0};
TAtomic ThreadId{0};
@@ -231,7 +231,7 @@ void NPar::TLocalExecutor::TImpl::LaunchRange(TIntrusivePtr<TLocalRangeExecutor>
return;
}
AtomicAdd(*queueSize, count);
- jobQueue->EnqueueAll(TVector<TSingleJob>{size_t(count), TSingleJob(rangeExec, 0)});
+ jobQueue->EnqueueAll(TVector<TSingleJob>{size_t(count), TSingleJob(rangeExec, 0)});
HasJob.Signal();
}
@@ -269,13 +269,13 @@ void NPar::TLocalExecutor::Exec(TIntrusivePtr<ILocallyExecutable> exec, int id,
Impl_->HasJob.Signal();
}
-void NPar::ILocalExecutor::Exec(TLocallyExecutableFunction exec, int id, int flags) {
+void NPar::ILocalExecutor::Exec(TLocallyExecutableFunction exec, int id, int flags) {
Exec(new TFunctionWrapper(std::move(exec)), id, flags);
}
void NPar::TLocalExecutor::ExecRange(TIntrusivePtr<ILocallyExecutable> exec, int firstId, int lastId, int flags) {
Y_ASSERT(lastId >= firstId);
- if (TryExecRangeSequentially([=] (int id) { exec->LocalExec(id); }, firstId, lastId, flags)) {
+ if (TryExecRangeSequentially([=] (int id) { exec->LocalExec(id); }, firstId, lastId, flags)) {
return;
}
auto rangeExec = MakeIntrusive<TLocalRangeExecutor>(std::move(exec), firstId, lastId);
@@ -305,18 +305,18 @@ void NPar::TLocalExecutor::ExecRange(TIntrusivePtr<ILocallyExecutable> exec, int
}
}
-void NPar::ILocalExecutor::ExecRange(TLocallyExecutableFunction exec, int firstId, int lastId, int flags) {
- if (TryExecRangeSequentially(exec, firstId, lastId, flags)) {
- return;
- }
+void NPar::ILocalExecutor::ExecRange(TLocallyExecutableFunction exec, int firstId, int lastId, int flags) {
+ if (TryExecRangeSequentially(exec, firstId, lastId, flags)) {
+ return;
+ }
ExecRange(new TFunctionWrapper(exec), firstId, lastId, flags);
}
-void NPar::ILocalExecutor::ExecRangeWithThrow(TLocallyExecutableFunction exec, int firstId, int lastId, int flags) {
+void NPar::ILocalExecutor::ExecRangeWithThrow(TLocallyExecutableFunction exec, int firstId, int lastId, int flags) {
Y_VERIFY((flags & WAIT_COMPLETE) != 0, "ExecRangeWithThrow() requires WAIT_COMPLETE to wait if exceptions arise.");
- if (TryExecRangeSequentially(exec, firstId, lastId, flags)) {
- return;
- }
+ if (TryExecRangeSequentially(exec, firstId, lastId, flags)) {
+ return;
+ }
TVector<NThreading::TFuture<void>> currentRun = ExecRangeWithFutures(exec, firstId, lastId, flags);
for (auto& result : currentRun) {
result.GetValueSync(); // Exception will be rethrown if exists. If several exception - only the one with minimal id is rethrown.
@@ -324,7 +324,7 @@ void NPar::ILocalExecutor::ExecRangeWithThrow(TLocallyExecutableFunction exec, i
}
TVector<NThreading::TFuture<void>>
-NPar::ILocalExecutor::ExecRangeWithFutures(TLocallyExecutableFunction exec, int firstId, int lastId, int flags) {
+NPar::ILocalExecutor::ExecRangeWithFutures(TLocallyExecutableFunction exec, int firstId, int lastId, int flags) {
TFunctionWrapperWithPromise* execWrapper = new TFunctionWrapperWithPromise(exec, firstId, lastId);
TVector<NThreading::TFuture<void>> out = execWrapper->GetFutures();
ExecRange(execWrapper, firstId, lastId, flags);
diff --git a/library/cpp/threading/local_executor/local_executor.h b/library/cpp/threading/local_executor/local_executor.h
index c1c824f67c..cbde5c62f4 100644
--- a/library/cpp/threading/local_executor/local_executor.h
+++ b/library/cpp/threading/local_executor/local_executor.h
@@ -2,7 +2,7 @@
#include <library/cpp/threading/future/future.h>
-#include <util/generic/cast.h>
+#include <util/generic/cast.h>
#include <util/generic/fwd.h>
#include <util/generic/noncopyable.h>
#include <util/generic/ptr.h>
@@ -26,11 +26,11 @@ namespace NPar {
//
using TLocallyExecutableFunction = std::function<void(int)>;
- class ILocalExecutor: public TNonCopyable {
+ class ILocalExecutor: public TNonCopyable {
public:
- ILocalExecutor() = default;
- virtual ~ILocalExecutor() = default;
-
+ ILocalExecutor() = default;
+ virtual ~ILocalExecutor() = default;
+
enum EFlags : int {
HIGH_PRIORITY = 0,
MED_PRIORITY = 1,
@@ -38,60 +38,60 @@ namespace NPar {
PRIORITY_MASK = 3,
WAIT_COMPLETE = 4
};
-
- // Add task for further execution.
- //
- // @param exec Task description.
- // @param id Task argument.
- // @param flags Bitmask composed by `HIGH_PRIORITY`, `MED_PRIORITY`, `LOW_PRIORITY`
- // and `WAIT_COMPLETE`.
- virtual void Exec(TIntrusivePtr<ILocallyExecutable> exec, int id, int flags) = 0;
-
- // Add tasks range for further execution.
- //
- // @param exec Task description.
- // @param firstId, lastId Task arguments [firstId, lastId)
- // @param flags Same as for `Exec`.
- virtual void ExecRange(TIntrusivePtr<ILocallyExecutable> exec, int firstId, int lastId, int flags) = 0;
-
- // 0-based ILocalExecutor worker thread identification
- virtual int GetWorkerThreadId() const noexcept = 0;
- virtual int GetThreadCount() const noexcept = 0;
-
+
+ // Add task for further execution.
+ //
+ // @param exec Task description.
+ // @param id Task argument.
+ // @param flags Bitmask composed by `HIGH_PRIORITY`, `MED_PRIORITY`, `LOW_PRIORITY`
+ // and `WAIT_COMPLETE`.
+ virtual void Exec(TIntrusivePtr<ILocallyExecutable> exec, int id, int flags) = 0;
+
+ // Add tasks range for further execution.
+ //
+ // @param exec Task description.
+ // @param firstId, lastId Task arguments [firstId, lastId)
+ // @param flags Same as for `Exec`.
+ virtual void ExecRange(TIntrusivePtr<ILocallyExecutable> exec, int firstId, int lastId, int flags) = 0;
+
+ // 0-based ILocalExecutor worker thread identification
+ virtual int GetWorkerThreadId() const noexcept = 0;
+ virtual int GetThreadCount() const noexcept = 0;
+
// Describes a range of tasks with parameters from integer range [FirstId, LastId).
//
class TExecRangeParams {
- public:
- template <typename TFirst, typename TLast>
- TExecRangeParams(TFirst firstId, TLast lastId)
- : FirstId(SafeIntegerCast<int>(firstId))
- , LastId(SafeIntegerCast<int>(lastId))
- {
- Y_ASSERT(LastId >= FirstId);
+ public:
+ template <typename TFirst, typename TLast>
+ TExecRangeParams(TFirst firstId, TLast lastId)
+ : FirstId(SafeIntegerCast<int>(firstId))
+ , LastId(SafeIntegerCast<int>(lastId))
+ {
+ Y_ASSERT(LastId >= FirstId);
SetBlockSize(1);
- }
+ }
// Partition tasks into `blockCount` blocks of approximately equal size, each of which
// will be executed as a separate bigger task.
//
- template <typename TBlockCount>
- TExecRangeParams& SetBlockCount(TBlockCount blockCount) {
+ template <typename TBlockCount>
+ TExecRangeParams& SetBlockCount(TBlockCount blockCount) {
Y_ASSERT(SafeIntegerCast<int>(blockCount) > 0 || FirstId == LastId);
BlockSize = FirstId == LastId ? 0 : CeilDiv(LastId - FirstId, SafeIntegerCast<int>(blockCount));
BlockCount = BlockSize == 0 ? 0 : CeilDiv(LastId - FirstId, BlockSize);
BlockEqualToThreads = false;
- return *this;
- }
+ return *this;
+ }
// Partition tasks into blocks of approximately `blockSize` size, each of which will
// be executed as a separate bigger task.
//
- template <typename TBlockSize>
- TExecRangeParams& SetBlockSize(TBlockSize blockSize) {
+ template <typename TBlockSize>
+ TExecRangeParams& SetBlockSize(TBlockSize blockSize) {
Y_ASSERT(SafeIntegerCast<int>(blockSize) > 0 || FirstId == LastId);
- BlockSize = SafeIntegerCast<int>(blockSize);
+ BlockSize = SafeIntegerCast<int>(blockSize);
BlockCount = BlockSize == 0 ? 0 : CeilDiv(LastId - FirstId, BlockSize);
BlockEqualToThreads = false;
- return *this;
- }
+ return *this;
+ }
// Partition tasks into thread count blocks of approximately equal size, each of which
// will be executed as a separate bigger task.
//
@@ -99,27 +99,27 @@ namespace NPar {
BlockEqualToThreads = true;
return *this;
}
- int GetBlockCount() const {
+ int GetBlockCount() const {
Y_ASSERT(!BlockEqualToThreads);
- return BlockCount;
- }
- int GetBlockSize() const {
+ return BlockCount;
+ }
+ int GetBlockSize() const {
Y_ASSERT(!BlockEqualToThreads);
- return BlockSize;
- }
+ return BlockSize;
+ }
bool GetBlockEqualToThreads() {
return BlockEqualToThreads;
}
-
- const int FirstId = 0;
- const int LastId = 0;
-
- private:
+
+ const int FirstId = 0;
+ const int LastId = 0;
+
+ private:
int BlockSize;
int BlockCount;
bool BlockEqualToThreads;
- };
-
+ };
+
// `Exec` and `ExecRange` versions that accept functions.
//
void Exec(TLocallyExecutableFunction exec, int id, int flags);
@@ -136,26 +136,26 @@ namespace NPar {
TVector<NThreading::TFuture<void>> ExecRangeWithFutures(TLocallyExecutableFunction exec, int firstId, int lastId, int flags);
template <typename TBody>
- static inline auto BlockedLoopBody(const TExecRangeParams& params, const TBody& body) {
+ static inline auto BlockedLoopBody(const TExecRangeParams& params, const TBody& body) {
return [=](int blockId) {
const int blockFirstId = params.FirstId + blockId * params.GetBlockSize();
const int blockLastId = Min(params.LastId, blockFirstId + params.GetBlockSize());
- for (int i = blockFirstId; i < blockLastId; ++i) {
- body(i);
- }
- };
- }
-
- template <typename TBody>
+ for (int i = blockFirstId; i < blockLastId; ++i) {
+ body(i);
+ }
+ };
+ }
+
+ template <typename TBody>
inline void ExecRange(TBody&& body, TExecRangeParams params, int flags) {
- if (TryExecRangeSequentially(body, params.FirstId, params.LastId, flags)) {
+ if (TryExecRangeSequentially(body, params.FirstId, params.LastId, flags)) {
return;
}
if (params.GetBlockEqualToThreads()) {
params.SetBlockCount(GetThreadCount() + ((flags & WAIT_COMPLETE) != 0)); // ThreadCount or ThreadCount+1 depending on WaitFlag
- }
+ }
ExecRange(BlockedLoopBody(params, body), 0, params.GetBlockCount(), flags);
- }
+ }
template <typename TBody>
inline void ExecRangeBlockedWithThrow(TBody&& body, int firstId, int lastId, int batchSizeOrZeroForAutoBatchSize, int flags) {
@@ -185,97 +185,97 @@ namespace NPar {
}
}
- template <typename TBody>
- static inline bool TryExecRangeSequentially(TBody&& body, int firstId, int lastId, int flags) {
- if (lastId == firstId) {
- return true;
- }
- if ((flags & WAIT_COMPLETE) && lastId - firstId == 1) {
- body(firstId);
- return true;
- }
- return false;
- }
- };
-
- // `TLocalExecutor` provides facilities for easy parallelization of existing code and cycles.
- //
- // Examples:
- // Execute one task with medium priority and wait for it completion.
- // ```
- // LocalExecutor().Run(4);
- // TEvent event;
- // LocalExecutor().Exec([](int) {
- // SomeFunc();
- // event.Signal();
- // }, 0, TLocalExecutor::MED_PRIORITY);
- //
- // SomeOtherCode();
- // event.WaitI();
- // ```
- //
- // Execute range of tasks with medium priority.
- // ```
- // LocalExecutor().Run(4);
- // LocalExecutor().ExecRange([](int id) {
- // SomeFunc(id);
- // }, TExecRangeParams(0, 10), TLocalExecutor::WAIT_COMPLETE | TLocalExecutor::MED_PRIORITY);
- // ```
- //
- class TLocalExecutor final: public ILocalExecutor {
- public:
- using EFlags = ILocalExecutor::EFlags;
-
- // Creates executor without threads. You'll need to explicitly call `RunAdditionalThreads`
- // to add threads to underlying thread pool.
- //
- TLocalExecutor();
- ~TLocalExecutor();
-
- int GetQueueSize() const noexcept;
- int GetMPQueueSize() const noexcept;
- int GetLPQueueSize() const noexcept;
- void ClearLPQueue();
-
- // 0-based TLocalExecutor worker thread identification
- int GetWorkerThreadId() const noexcept override;
- int GetThreadCount() const noexcept override;
-
- // **Add** threads to underlying thread pool.
- //
- // @param threadCount Number of threads to add.
- void RunAdditionalThreads(int threadCount);
-
- // Add task for further execution.
- //
- // @param exec Task description.
- // @param id Task argument.
- // @param flags Bitmask composed by `HIGH_PRIORITY`, `MED_PRIORITY`, `LOW_PRIORITY`
- // and `WAIT_COMPLETE`.
- void Exec(TIntrusivePtr<ILocallyExecutable> exec, int id, int flags) override;
-
- // Add tasks range for further execution.
- //
- // @param exec Task description.
- // @param firstId, lastId Task arguments [firstId, lastId)
- // @param flags Same as for `Exec`.
- void ExecRange(TIntrusivePtr<ILocallyExecutable> exec, int firstId, int lastId, int flags) override;
-
- using ILocalExecutor::Exec;
- using ILocalExecutor::ExecRange;
-
- private:
- class TImpl;
- THolder<TImpl> Impl_;
+ template <typename TBody>
+ static inline bool TryExecRangeSequentially(TBody&& body, int firstId, int lastId, int flags) {
+ if (lastId == firstId) {
+ return true;
+ }
+ if ((flags & WAIT_COMPLETE) && lastId - firstId == 1) {
+ body(firstId);
+ return true;
+ }
+ return false;
+ }
};
+ // `TLocalExecutor` provides facilities for easy parallelization of existing code and cycles.
+ //
+ // Examples:
+ // Execute one task with medium priority and wait for it completion.
+ // ```
+ // LocalExecutor().Run(4);
+ // TEvent event;
+ // LocalExecutor().Exec([](int) {
+ // SomeFunc();
+ // event.Signal();
+ // }, 0, TLocalExecutor::MED_PRIORITY);
+ //
+ // SomeOtherCode();
+ // event.WaitI();
+ // ```
+ //
+ // Execute range of tasks with medium priority.
+ // ```
+ // LocalExecutor().Run(4);
+ // LocalExecutor().ExecRange([](int id) {
+ // SomeFunc(id);
+ // }, TExecRangeParams(0, 10), TLocalExecutor::WAIT_COMPLETE | TLocalExecutor::MED_PRIORITY);
+ // ```
+ //
+ class TLocalExecutor final: public ILocalExecutor {
+ public:
+ using EFlags = ILocalExecutor::EFlags;
+
+ // Creates executor without threads. You'll need to explicitly call `RunAdditionalThreads`
+ // to add threads to underlying thread pool.
+ //
+ TLocalExecutor();
+ ~TLocalExecutor();
+
+ int GetQueueSize() const noexcept;
+ int GetMPQueueSize() const noexcept;
+ int GetLPQueueSize() const noexcept;
+ void ClearLPQueue();
+
+ // 0-based TLocalExecutor worker thread identification
+ int GetWorkerThreadId() const noexcept override;
+ int GetThreadCount() const noexcept override;
+
+ // **Add** threads to underlying thread pool.
+ //
+ // @param threadCount Number of threads to add.
+ void RunAdditionalThreads(int threadCount);
+
+ // Add task for further execution.
+ //
+ // @param exec Task description.
+ // @param id Task argument.
+ // @param flags Bitmask composed by `HIGH_PRIORITY`, `MED_PRIORITY`, `LOW_PRIORITY`
+ // and `WAIT_COMPLETE`.
+ void Exec(TIntrusivePtr<ILocallyExecutable> exec, int id, int flags) override;
+
+ // Add tasks range for further execution.
+ //
+ // @param exec Task description.
+ // @param firstId, lastId Task arguments [firstId, lastId)
+ // @param flags Same as for `Exec`.
+ void ExecRange(TIntrusivePtr<ILocallyExecutable> exec, int firstId, int lastId, int flags) override;
+
+ using ILocalExecutor::Exec;
+ using ILocalExecutor::ExecRange;
+
+ private:
+ class TImpl;
+ THolder<TImpl> Impl_;
+ };
+
static inline TLocalExecutor& LocalExecutor() {
return *Singleton<TLocalExecutor>();
}
template <typename TBody>
- inline void ParallelFor(ILocalExecutor& executor, ui32 from, ui32 to, TBody&& body) {
- ILocalExecutor::TExecRangeParams params(from, to);
+ inline void ParallelFor(ILocalExecutor& executor, ui32 from, ui32 to, TBody&& body) {
+ ILocalExecutor::TExecRangeParams params(from, to);
params.SetBlockCountToThreadCount();
executor.ExecRange(std::forward<TBody>(body), params, TLocalExecutor::WAIT_COMPLETE);
}
@@ -287,7 +287,7 @@ namespace NPar {
template <typename TBody>
inline void AsyncParallelFor(ui32 from, ui32 to, TBody&& body) {
- ILocalExecutor::TExecRangeParams params(from, to);
+ ILocalExecutor::TExecRangeParams params(from, to);
params.SetBlockCountToThreadCount();
LocalExecutor().ExecRange(std::forward<TBody>(body), params, 0);
}