aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/threading/local_executor/local_executor.cpp
diff options
context:
space:
mode:
authorEvgueni Petrov <evgueni.s.petrov@gmail.com>2022-02-10 16:46:59 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:46:59 +0300
commit19d7d7947f95423df4b50d3a6e858cd689db06ed (patch)
treeb31e808f1761fed50236a1ae5bda03cd3bb9d428 /library/cpp/threading/local_executor/local_executor.cpp
parent122c87ebe5797d52f1800afb02c87f751135d0a4 (diff)
downloadydb-19d7d7947f95423df4b50d3a6e858cd689db06ed.tar.gz
Restoring authorship annotation for Evgueni Petrov <evgueni.s.petrov@gmail.com>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/threading/local_executor/local_executor.cpp')
-rw-r--r--library/cpp/threading/local_executor/local_executor.cpp36
1 files changed, 18 insertions, 18 deletions
diff --git a/library/cpp/threading/local_executor/local_executor.cpp b/library/cpp/threading/local_executor/local_executor.cpp
index 1d3fbb4bf4..17f4fc636b 100644
--- a/library/cpp/threading/local_executor/local_executor.cpp
+++ b/library/cpp/threading/local_executor/local_executor.cpp
@@ -84,8 +84,8 @@ namespace {
class TLocalRangeExecutor: public NPar::ILocallyExecutable {
TIntrusivePtr<NPar::ILocallyExecutable> Exec;
- alignas(64) TAtomic Counter;
- alignas(64) TAtomic WorkerCount;
+ alignas(64) TAtomic Counter;
+ alignas(64) TAtomic WorkerCount;
int LastId;
void LocalExec(int) override {
@@ -106,7 +106,7 @@ namespace {
{
}
bool DoSingleOp() {
- const int id = AtomicAdd(Counter, 1) - 1;
+ const int id = AtomicAdd(Counter, 1) - 1;
if (id >= LastId)
return false;
Exec->LocalExec(id);
@@ -116,7 +116,7 @@ namespace {
void WaitComplete() {
while (AtomicGet(WorkerCount) > 0)
RegularYield();
- }
+ }
int GetRangeSize() const {
return Max<int>(LastId - Counter, 0);
}
@@ -130,10 +130,10 @@ public:
TLockFreeQueue<TSingleJob> JobQueue;
TLockFreeQueue<TSingleJob> MedJobQueue;
TLockFreeQueue<TSingleJob> LowJobQueue;
- alignas(64) TSystemEvent HasJob;
+ alignas(64) TSystemEvent HasJob;
TAtomic ThreadCount{0};
- alignas(64) TAtomic QueueSize{0};
+ alignas(64) TAtomic QueueSize{0};
TAtomic MPQueueSize{0};
TAtomic LPQueueSize{0};
TAtomic ThreadId{0};
@@ -231,7 +231,7 @@ void NPar::TLocalExecutor::TImpl::LaunchRange(TIntrusivePtr<TLocalRangeExecutor>
return;
}
AtomicAdd(*queueSize, count);
- jobQueue->EnqueueAll(TVector<TSingleJob>{size_t(count), TSingleJob(rangeExec, 0)});
+ jobQueue->EnqueueAll(TVector<TSingleJob>{size_t(count), TSingleJob(rangeExec, 0)});
HasJob.Signal();
}
@@ -269,13 +269,13 @@ void NPar::TLocalExecutor::Exec(TIntrusivePtr<ILocallyExecutable> exec, int id,
Impl_->HasJob.Signal();
}
-void NPar::ILocalExecutor::Exec(TLocallyExecutableFunction exec, int id, int flags) {
+void NPar::ILocalExecutor::Exec(TLocallyExecutableFunction exec, int id, int flags) {
Exec(new TFunctionWrapper(std::move(exec)), id, flags);
}
void NPar::TLocalExecutor::ExecRange(TIntrusivePtr<ILocallyExecutable> exec, int firstId, int lastId, int flags) {
Y_ASSERT(lastId >= firstId);
- if (TryExecRangeSequentially([=] (int id) { exec->LocalExec(id); }, firstId, lastId, flags)) {
+ if (TryExecRangeSequentially([=] (int id) { exec->LocalExec(id); }, firstId, lastId, flags)) {
return;
}
auto rangeExec = MakeIntrusive<TLocalRangeExecutor>(std::move(exec), firstId, lastId);
@@ -305,18 +305,18 @@ void NPar::TLocalExecutor::ExecRange(TIntrusivePtr<ILocallyExecutable> exec, int
}
}
-void NPar::ILocalExecutor::ExecRange(TLocallyExecutableFunction exec, int firstId, int lastId, int flags) {
- if (TryExecRangeSequentially(exec, firstId, lastId, flags)) {
- return;
- }
+void NPar::ILocalExecutor::ExecRange(TLocallyExecutableFunction exec, int firstId, int lastId, int flags) {
+ if (TryExecRangeSequentially(exec, firstId, lastId, flags)) {
+ return;
+ }
ExecRange(new TFunctionWrapper(exec), firstId, lastId, flags);
}
-void NPar::ILocalExecutor::ExecRangeWithThrow(TLocallyExecutableFunction exec, int firstId, int lastId, int flags) {
+void NPar::ILocalExecutor::ExecRangeWithThrow(TLocallyExecutableFunction exec, int firstId, int lastId, int flags) {
Y_VERIFY((flags & WAIT_COMPLETE) != 0, "ExecRangeWithThrow() requires WAIT_COMPLETE to wait if exceptions arise.");
- if (TryExecRangeSequentially(exec, firstId, lastId, flags)) {
- return;
- }
+ if (TryExecRangeSequentially(exec, firstId, lastId, flags)) {
+ return;
+ }
TVector<NThreading::TFuture<void>> currentRun = ExecRangeWithFutures(exec, firstId, lastId, flags);
for (auto& result : currentRun) {
result.GetValueSync(); // Exception will be rethrown if exists. If several exception - only the one with minimal id is rethrown.
@@ -324,7 +324,7 @@ void NPar::ILocalExecutor::ExecRangeWithThrow(TLocallyExecutableFunction exec, i
}
TVector<NThreading::TFuture<void>>
-NPar::ILocalExecutor::ExecRangeWithFutures(TLocallyExecutableFunction exec, int firstId, int lastId, int flags) {
+NPar::ILocalExecutor::ExecRangeWithFutures(TLocallyExecutableFunction exec, int firstId, int lastId, int flags) {
TFunctionWrapperWithPromise* execWrapper = new TFunctionWrapperWithPromise(exec, firstId, lastId);
TVector<NThreading::TFuture<void>> out = execWrapper->GetFutures();
ExecRange(execWrapper, firstId, lastId, flags);