aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/threading
diff options
context:
space:
mode:
authorStanislav Kirillov <staskirillov@gmail.com>2022-02-10 16:46:07 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:46:07 +0300
commit92fe2b1e7bc79f7b95adef61714fc003f6ea4a1c (patch)
tree817034f4ca57c9f841bb047ec94630c2e78a2b1d /library/cpp/threading
parent53c76da6d9f6cc5a17f6029df396f0e3bc1ff47d (diff)
downloadydb-92fe2b1e7bc79f7b95adef61714fc003f6ea4a1c.tar.gz
Restoring authorship annotation for Stanislav Kirillov <staskirillov@gmail.com>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/threading')
-rw-r--r--library/cpp/threading/local_executor/README.md86
-rw-r--r--library/cpp/threading/local_executor/local_executor.cpp26
-rw-r--r--library/cpp/threading/local_executor/local_executor.h28
-rw-r--r--library/cpp/threading/local_executor/ya.make14
-rw-r--r--library/cpp/threading/poor_man_openmp/thread_helper.h78
5 files changed, 116 insertions, 116 deletions
diff --git a/library/cpp/threading/local_executor/README.md b/library/cpp/threading/local_executor/README.md
index aaad2e2986..0fdd6f6173 100644
--- a/library/cpp/threading/local_executor/README.md
+++ b/library/cpp/threading/local_executor/README.md
@@ -1,23 +1,23 @@
-# Library for parallel task execution in thread pool
-
-This library allows easy parallelization of existing code and cycles.
-It provides `NPar::TLocalExecutor` class and `NPar::LocalExecutor()` singleton accessor.
+# Library for parallel task execution in thread pool
+
+This library allows easy parallelization of existing code and cycles.
+It provides `NPar::TLocalExecutor` class and `NPar::LocalExecutor()` singleton accessor.
At start, `TLocalExecutor` has no threads in thread pool and all async tasks will be queued for later execution when extra threads appear.
-All tasks should be `NPar::ILocallyExecutable` child class or function equal to `std::function<void(int)>`
-
-## TLocalExecutor methods
-
-`TLocalExecutor::Run(int threadcount)` - add threads to thread pool (**WARNING!** `Run(threadcount)` will *add* `threadcount` threads to pool)
-
-`void TLocalExecutor::Exec(TLocallyExecutableFunction exec, int id, int flags)` - run one task and pass id as task function input, flags - bitmask composition of:
-
-- `TLocalExecutor::HIGH_PRIORITY = 0` - put task in high priority queue
-- `TLocalExecutor::MED_PRIORITY = 1` - put task in medium priority queue
-- `TLocalExecutor::LOW_PRIORITY = 2` - put task in low priority queue
-- `TLocalExecutor::WAIT_COMPLETE = 4` - wait for task completion
-
+All tasks should be `NPar::ILocallyExecutable` child class or function equal to `std::function<void(int)>`
+
+## TLocalExecutor methods
+
+`TLocalExecutor::Run(int threadcount)` - add threads to thread pool (**WARNING!** `Run(threadcount)` will *add* `threadcount` threads to pool)
+
+`void TLocalExecutor::Exec(TLocallyExecutableFunction exec, int id, int flags)` - run one task and pass id as task function input, flags - bitmask composition of:
+
+- `TLocalExecutor::HIGH_PRIORITY = 0` - put task in high priority queue
+- `TLocalExecutor::MED_PRIORITY = 1` - put task in medium priority queue
+- `TLocalExecutor::LOW_PRIORITY = 2` - put task in low priority queue
+- `TLocalExecutor::WAIT_COMPLETE = 4` - wait for task completion
+
`void TLocalExecutor::ExecRange(TLocallyExecutableFunction exec, TExecRangeParams blockParams, int flags);` - run range of tasks `[TExecRangeParams::FirstId, TExecRangeParams::LastId).`
-
+
`flags` is the same as for `TLocalExecutor::Exec`.
`TExecRangeParams` is a structure that describes the range.
@@ -33,32 +33,32 @@ It is also possible to partition range of tasks in consequtive blocks and execut
the range of tasks into consequtive blocks of approximately given size, or of size calculated
by partitioning the range into approximately equal size blocks of given count.
-## Examples
-
-### Simple task async exec with medium priority
-
-```cpp
-using namespace NPar;
-
-LocalExecutor().Run(4);
-TEvent event;
-LocalExecutor().Exec([](int) {
- SomeFunc();
- event.Signal();
-}, 0, TLocalExecutor::MED_PRIORITY);
-
-SomeOtherCode();
-event.WaitI();
-```
-
-### Execute task range and wait completion
-
-```cpp
-using namespace NPar;
-
-LocalExecutor().Run(4);
+## Examples
+
+### Simple task async exec with medium priority
+
+```cpp
+using namespace NPar;
+
+LocalExecutor().Run(4);
+TEvent event;
+LocalExecutor().Exec([](int) {
+ SomeFunc();
+ event.Signal();
+}, 0, TLocalExecutor::MED_PRIORITY);
+
+SomeOtherCode();
+event.WaitI();
+```
+
+### Execute task range and wait completion
+
+```cpp
+using namespace NPar;
+
+LocalExecutor().Run(4);
LocalExecutor().ExecRange([](int id) {
- SomeFunc(id);
+ SomeFunc(id);
}, TExecRangeParams(0, 10), TLocalExecutor::WAIT_COMPLETE | TLocalExecutor::MED_PRIORITY);
```
diff --git a/library/cpp/threading/local_executor/local_executor.cpp b/library/cpp/threading/local_executor/local_executor.cpp
index 1d3fbb4bf4..d19b48d0b1 100644
--- a/library/cpp/threading/local_executor/local_executor.cpp
+++ b/library/cpp/threading/local_executor/local_executor.cpp
@@ -13,12 +13,12 @@
#include <utility>
#ifdef _win_
-static void RegularYield() {
+static void RegularYield() {
}
#else
// unix actually has cooperative multitasking! :)
// without this function program runs slower and system lags for some magic reason
-static void RegularYield() {
+static void RegularYield() {
SchedYield();
}
#endif
@@ -28,12 +28,12 @@ namespace {
NPar::TLocallyExecutableFunction Exec;
TFunctionWrapper(NPar::TLocallyExecutableFunction exec)
: Exec(std::move(exec))
- {
- }
- void LocalExec(int id) override {
- Exec(id);
- }
- };
+ {
+ }
+ void LocalExec(int id) override {
+ Exec(id);
+ }
+ };
class TFunctionWrapperWithPromise: public NPar::ILocallyExecutable {
private:
@@ -73,7 +73,7 @@ namespace {
struct TSingleJob {
TIntrusivePtr<NPar::ILocallyExecutable> Exec;
int Id{0};
-
+
TSingleJob() = default;
TSingleJob(TIntrusivePtr<NPar::ILocallyExecutable> exec, int id)
: Exec(std::move(exec))
@@ -95,7 +95,7 @@ namespace {
break;
}
AtomicAdd(WorkerCount, -1);
- }
+ }
public:
TLocalRangeExecutor(TIntrusivePtr<ILocallyExecutable> exec, int firstId, int lastId)
@@ -104,7 +104,7 @@ namespace {
, WorkerCount(0)
, LastId(lastId)
{
- }
+ }
bool DoSingleOp() {
const int id = AtomicAdd(Counter, 1) - 1;
if (id >= LastId)
@@ -112,14 +112,14 @@ namespace {
Exec->LocalExec(id);
RegularYield();
return true;
- }
+ }
void WaitComplete() {
while (AtomicGet(WorkerCount) > 0)
RegularYield();
}
int GetRangeSize() const {
return Max<int>(LastId - Counter, 0);
- }
+ }
};
}
diff --git a/library/cpp/threading/local_executor/local_executor.h b/library/cpp/threading/local_executor/local_executor.h
index c1c824f67c..e12536bd46 100644
--- a/library/cpp/threading/local_executor/local_executor.h
+++ b/library/cpp/threading/local_executor/local_executor.h
@@ -11,15 +11,15 @@
#include <functional>
-namespace NPar {
- struct ILocallyExecutable : virtual public TThrRefBase {
+namespace NPar {
+ struct ILocallyExecutable : virtual public TThrRefBase {
// Must be implemented by the end user to define job that will be processed by one of
// executor threads.
//
// @param id Job parameter, typically an index pointing somewhere in array, or just
// some dummy value, e.g. `0`.
- virtual void LocalExec(int id) = 0;
- };
+ virtual void LocalExec(int id) = 0;
+ };
// Alternative and simpler way of describing a job for executor. Function argument has the
// same meaning as `id` in `ILocallyExecutable::LocalExec`.
@@ -27,17 +27,17 @@ namespace NPar {
using TLocallyExecutableFunction = std::function<void(int)>;
class ILocalExecutor: public TNonCopyable {
- public:
+ public:
ILocalExecutor() = default;
virtual ~ILocalExecutor() = default;
enum EFlags : int {
- HIGH_PRIORITY = 0,
- MED_PRIORITY = 1,
- LOW_PRIORITY = 2,
- PRIORITY_MASK = 3,
- WAIT_COMPLETE = 4
- };
+ HIGH_PRIORITY = 0,
+ MED_PRIORITY = 1,
+ LOW_PRIORITY = 2,
+ PRIORITY_MASK = 3,
+ WAIT_COMPLETE = 4
+ };
// Add task for further execution.
//
@@ -156,7 +156,7 @@ namespace NPar {
}
ExecRange(BlockedLoopBody(params, body), 0, params.GetBlockCount(), flags);
}
-
+
template <typename TBody>
inline void ExecRangeBlockedWithThrow(TBody&& body, int firstId, int lastId, int batchSizeOrZeroForAutoBatchSize, int flags) {
if (firstId >= lastId) {
@@ -270,8 +270,8 @@ namespace NPar {
};
static inline TLocalExecutor& LocalExecutor() {
- return *Singleton<TLocalExecutor>();
- }
+ return *Singleton<TLocalExecutor>();
+ }
template <typename TBody>
inline void ParallelFor(ILocalExecutor& executor, ui32 from, ui32 to, TBody&& body) {
diff --git a/library/cpp/threading/local_executor/ya.make b/library/cpp/threading/local_executor/ya.make
index df210f92bb..e340556678 100644
--- a/library/cpp/threading/local_executor/ya.make
+++ b/library/cpp/threading/local_executor/ya.make
@@ -1,8 +1,8 @@
OWNER(
- g:matrixnet
+ g:matrixnet
gulin
- kirillovs
- espetrov
+ kirillovs
+ espetrov
)
LIBRARY()
@@ -12,9 +12,9 @@ SRCS(
tbb_local_executor.cpp
)
-PEERDIR(
- contrib/libs/tbb
+PEERDIR(
+ contrib/libs/tbb
library/cpp/threading/future
-)
-
+)
+
END()
diff --git a/library/cpp/threading/poor_man_openmp/thread_helper.h b/library/cpp/threading/poor_man_openmp/thread_helper.h
index 0ecee0590b..d51cda8daa 100644
--- a/library/cpp/threading/poor_man_openmp/thread_helper.h
+++ b/library/cpp/threading/poor_man_openmp/thread_helper.h
@@ -1,45 +1,45 @@
-#pragma once
-
+#pragma once
+
#include <util/thread/pool.h>
-#include <util/generic/utility.h>
+#include <util/generic/utility.h>
#include <util/generic/yexception.h>
-#include <util/system/info.h>
-#include <util/system/atomic.h>
-#include <util/system/condvar.h>
-#include <util/system/mutex.h>
+#include <util/system/info.h>
+#include <util/system/atomic.h>
+#include <util/system/condvar.h>
+#include <util/system/mutex.h>
#include <util/stream/output.h>
-
+
#include <functional>
#include <cstdlib>
-
+
class TMtpQueueHelper {
-public:
+public:
TMtpQueueHelper() {
- SetThreadCount(NSystemInfo::CachedNumberOfCpus());
- }
+ SetThreadCount(NSystemInfo::CachedNumberOfCpus());
+ }
IThreadPool* Get() {
- return q.Get();
- }
- size_t GetThreadCount() {
- return ThreadCount;
- }
- void SetThreadCount(size_t threads) {
- ThreadCount = threads;
+ return q.Get();
+ }
+ size_t GetThreadCount() {
+ return ThreadCount;
+ }
+ void SetThreadCount(size_t threads) {
+ ThreadCount = threads;
q = CreateThreadPool(ThreadCount);
- }
+ }
static TMtpQueueHelper& Instance();
-private:
- size_t ThreadCount;
+private:
+ size_t ThreadCount;
TAutoPtr<IThreadPool> q;
-};
-
+};
+
namespace NYmp {
- inline void SetThreadCount(size_t threads) {
+ inline void SetThreadCount(size_t threads) {
TMtpQueueHelper::Instance().SetThreadCount(threads);
- }
-
+ }
+
inline size_t GetThreadCount() {
return TMtpQueueHelper::Instance().GetThreadCount();
}
@@ -50,9 +50,9 @@ namespace NYmp {
size_t threadCount = TMtpQueueHelper::Instance().GetThreadCount();
IThreadPool* queue = TMtpQueueHelper::Instance().Get();
- TCondVar cv;
- TMutex mutex;
- TAtomic counter = threadCount;
+ TCondVar cv;
+ TMutex mutex;
+ TAtomic counter = threadCount;
std::exception_ptr err;
for (size_t i = 0; i < threadCount; ++i) {
@@ -68,12 +68,12 @@ namespace NYmp {
}
currentChunkStart += chunkSize * threadCount;
- }
+ }
} catch (...) {
with_lock (mutex) {
err = std::current_exception();
}
- }
+ }
with_lock (mutex) {
if (AtomicDecrement(counter) == 0) {
@@ -81,25 +81,25 @@ namespace NYmp {
cv.Signal();
}
}
- });
- }
+ });
+ }
with_lock (mutex) {
while (AtomicGet(counter) > 0) {
cv.WaitI(mutex);
}
- }
+ }
if (err) {
std::rethrow_exception(err);
}
- }
-
+ }
+
template <typename T>
inline void ParallelForStaticAutoChunk(T begin, T end, std::function<void(T)> func) {
const size_t taskSize = end - begin;
const size_t threadCount = TMtpQueueHelper::Instance().GetThreadCount();
- ParallelForStaticChunk(begin, end, (taskSize + threadCount - 1) / threadCount, func);
- }
+ ParallelForStaticChunk(begin, end, (taskSize + threadCount - 1) / threadCount, func);
+ }
}