diff options
author | cobat <cobat@yandex-team.ru> | 2022-02-10 16:49:07 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:49:07 +0300 |
commit | e486e109b08823b61996f2154f0bc6b7c27a4af4 (patch) | |
tree | cb98553f871fe96452fd02bd46a1a4e0cf165844 /library/cpp/threading | |
parent | 85e7b8b43a12f69b4721aee71dea28491c9bd503 (diff) | |
download | ydb-e486e109b08823b61996f2154f0bc6b7c27a4af4.tar.gz |
Restoring authorship annotation for <cobat@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/threading')
5 files changed, 114 insertions, 114 deletions
diff --git a/library/cpp/threading/local_executor/README.md b/library/cpp/threading/local_executor/README.md index aaad2e2986..6191f76b9f 100644 --- a/library/cpp/threading/local_executor/README.md +++ b/library/cpp/threading/local_executor/README.md @@ -2,7 +2,7 @@ This library allows easy parallelization of existing code and cycles. It provides `NPar::TLocalExecutor` class and `NPar::LocalExecutor()` singleton accessor. -At start, `TLocalExecutor` has no threads in thread pool and all async tasks will be queued for later execution when extra threads appear. +At start, `TLocalExecutor` has no threads in thread pool and all async tasks will be queued for later execution when extra threads appear. All tasks should be `NPar::ILocallyExecutable` child class or function equal to `std::function<void(int)>` ## TLocalExecutor methods @@ -16,20 +16,20 @@ All tasks should be `NPar::ILocallyExecutable` child class or function equal to - `TLocalExecutor::LOW_PRIORITY = 2` - put task in low priority queue - `TLocalExecutor::WAIT_COMPLETE = 4` - wait for task completion -`void TLocalExecutor::ExecRange(TLocallyExecutableFunction exec, TExecRangeParams blockParams, int flags);` - run range of tasks `[TExecRangeParams::FirstId, TExecRangeParams::LastId).` +`void TLocalExecutor::ExecRange(TLocallyExecutableFunction exec, TExecRangeParams blockParams, int flags);` - run range of tasks `[TExecRangeParams::FirstId, TExecRangeParams::LastId).` `flags` is the same as for `TLocalExecutor::Exec`. -`TExecRangeParams` is a structure that describes the range. +`TExecRangeParams` is a structure that describes the range. By default each task is executed separately. Threads from thread pool are taking the tasks in the manner first come first serve. It is also possible to partition range of tasks in consequtive blocks and execute each block as a bigger task. -`TExecRangeParams::SetBlockCountToThreadCount()` will result in thread count tasks, +`TExecRangeParams::SetBlockCountToThreadCount()` will result in thread count tasks, where thread count is the count of threads in thread pool. each thread will execute approximately equal count of tasks from range. -`TExecRangeParams::SetBlockSize()` and `TExecRangeParams::SetBlockCount()` will partition +`TExecRangeParams::SetBlockSize()` and `TExecRangeParams::SetBlockCount()` will partition the range of tasks into consequtive blocks of approximately given size, or of size calculated by partitioning the range into approximately equal size blocks of given count. @@ -59,16 +59,16 @@ using namespace NPar; LocalExecutor().Run(4); LocalExecutor().ExecRange([](int id) { SomeFunc(id); -}, TExecRangeParams(0, 10), TLocalExecutor::WAIT_COMPLETE | TLocalExecutor::MED_PRIORITY); +}, TExecRangeParams(0, 10), TLocalExecutor::WAIT_COMPLETE | TLocalExecutor::MED_PRIORITY); ``` - -### Exception handling - -By default if a not caught exception arise in a task which runs through the Local Executor, then std::terminate() will be called immediately. The exception will be printed to stderr before the termination. Best practice is to handle exception within a task, or avoid throwing exceptions at all for performance reasons. - -However, if you'd like to handle and/or rethrow exceptions outside of a range, you can use ExecRangeWithFuture(). -It returns vector [0 .. LastId-FirstId] elements, where i-th element is a TFuture corresponding to task with id = (FirstId + i). -Use method .HasValue() of the element to check in Async mode if the corresponding task is complete. -Use .GetValue() or .GetValueSync() to wait for completion of the corresponding task. GetValue() and GetValueSync() will also rethrow an exception if it appears during execution of the task. - -You may also use ExecRangeWithThrow() to just receive an exception from a range if it appears. It rethrows an exception from a task with minimal id if such an exception exists, and guarantees normal flow if no exception arise. + +### Exception handling + +By default if a not caught exception arise in a task which runs through the Local Executor, then std::terminate() will be called immediately. The exception will be printed to stderr before the termination. Best practice is to handle exception within a task, or avoid throwing exceptions at all for performance reasons. + +However, if you'd like to handle and/or rethrow exceptions outside of a range, you can use ExecRangeWithFuture(). +It returns vector [0 .. LastId-FirstId] elements, where i-th element is a TFuture corresponding to task with id = (FirstId + i). +Use method .HasValue() of the element to check in Async mode if the corresponding task is complete. +Use .GetValue() or .GetValueSync() to wait for completion of the corresponding task. GetValue() and GetValueSync() will also rethrow an exception if it appears during execution of the task. + +You may also use ExecRangeWithThrow() to just receive an exception from a range if it appears. It rethrows an exception from a task with minimal id if such an exception exists, and guarantees normal flow if no exception arise. diff --git a/library/cpp/threading/local_executor/local_executor.cpp b/library/cpp/threading/local_executor/local_executor.cpp index 1d3fbb4bf4..2605a35166 100644 --- a/library/cpp/threading/local_executor/local_executor.cpp +++ b/library/cpp/threading/local_executor/local_executor.cpp @@ -36,40 +36,40 @@ namespace { }; class TFunctionWrapperWithPromise: public NPar::ILocallyExecutable { - private: + private: NPar::TLocallyExecutableFunction Exec; - int FirstId, LastId; - TVector<NThreading::TPromise<void>> Promises; + int FirstId, LastId; + TVector<NThreading::TPromise<void>> Promises; - public: + public: TFunctionWrapperWithPromise(NPar::TLocallyExecutableFunction exec, int firstId, int lastId) : Exec(std::move(exec)) - , FirstId(firstId) - , LastId(lastId) - { - Y_ASSERT(FirstId <= LastId); - const int rangeSize = LastId - FirstId; - Promises.resize(rangeSize, NThreading::NewPromise()); - for (auto& promise : Promises) { - promise = NThreading::NewPromise(); - } - } - - void LocalExec(int id) override { - Y_ASSERT(FirstId <= id && id < LastId); - NThreading::NImpl::SetValue(Promises[id - FirstId], [=] { Exec(id); }); - } - - TVector<NThreading::TFuture<void>> GetFutures() const { - TVector<NThreading::TFuture<void>> out; - out.reserve(Promises.ysize()); - for (auto& promise : Promises) { - out.push_back(promise.GetFuture()); - } - return out; - } - }; - + , FirstId(firstId) + , LastId(lastId) + { + Y_ASSERT(FirstId <= LastId); + const int rangeSize = LastId - FirstId; + Promises.resize(rangeSize, NThreading::NewPromise()); + for (auto& promise : Promises) { + promise = NThreading::NewPromise(); + } + } + + void LocalExec(int id) override { + Y_ASSERT(FirstId <= id && id < LastId); + NThreading::NImpl::SetValue(Promises[id - FirstId], [=] { Exec(id); }); + } + + TVector<NThreading::TFuture<void>> GetFutures() const { + TVector<NThreading::TFuture<void>> out; + out.reserve(Promises.ysize()); + for (auto& promise : Promises) { + out.push_back(promise.GetFuture()); + } + return out; + } + }; + struct TSingleJob { TIntrusivePtr<NPar::ILocallyExecutable> Exec; int Id{0}; diff --git a/library/cpp/threading/local_executor/local_executor.h b/library/cpp/threading/local_executor/local_executor.h index c1c824f67c..d661756008 100644 --- a/library/cpp/threading/local_executor/local_executor.h +++ b/library/cpp/threading/local_executor/local_executor.h @@ -60,7 +60,7 @@ namespace NPar { // Describes a range of tasks with parameters from integer range [FirstId, LastId). // - class TExecRangeParams { + class TExecRangeParams { public: template <typename TFirst, typename TLast> TExecRangeParams(TFirst firstId, TLast lastId) @@ -95,7 +95,7 @@ namespace NPar { // Partition tasks into thread count blocks of approximately equal size, each of which // will be executed as a separate bigger task. // - TExecRangeParams& SetBlockCountToThreadCount() { + TExecRangeParams& SetBlockCountToThreadCount() { BlockEqualToThreads = true; return *this; } @@ -147,7 +147,7 @@ namespace NPar { } template <typename TBody> - inline void ExecRange(TBody&& body, TExecRangeParams params, int flags) { + inline void ExecRange(TBody&& body, TExecRangeParams params, int flags) { if (TryExecRangeSequentially(body, params.FirstId, params.LastId, flags)) { return; } diff --git a/library/cpp/threading/local_executor/ut/local_executor_ut.cpp b/library/cpp/threading/local_executor/ut/local_executor_ut.cpp index ac5737717c..0ad343a279 100644 --- a/library/cpp/threading/local_executor/ut/local_executor_ut.cpp +++ b/library/cpp/threading/local_executor/ut/local_executor_ut.cpp @@ -2,23 +2,23 @@ #include <library/cpp/threading/future/future.h> #include <library/cpp/testing/unittest/registar.h> -#include <util/system/mutex.h> -#include <util/system/rwlock.h> +#include <util/system/mutex.h> +#include <util/system/rwlock.h> #include <util/generic/algorithm.h> - -using namespace NPar; - -class TTestException: public yexception { -}; - -static const int DefaultThreadsCount = 41; -static const int DefaultRangeSize = 999; - + +using namespace NPar; + +class TTestException: public yexception { +}; + +static const int DefaultThreadsCount = 41; +static const int DefaultRangeSize = 999; + Y_UNIT_TEST_SUITE(ExecRangeWithFutures){ bool AllOf(const TVector<int>& vec, int value){ - return AllOf(vec, [value](int element) { return value == element; }); + return AllOf(vec, [value](int element) { return value == element; }); } - + void AsyncRunAndWaitFuturesReady(int rangeSize, int threads) { TLocalExecutor localExecutor; localExecutor.RunAdditionalThreads(threads); @@ -37,26 +37,26 @@ void AsyncRunAndWaitFuturesReady(int rangeSize, int threads) { AtomicSet(signal, 1); for (auto& future : futures) { future.GetValueSync(); - } + } UNIT_ASSERT(AllOf(data, 1)); } - + Y_UNIT_TEST(AsyncRunRangeAndWaitFuturesReady) { AsyncRunAndWaitFuturesReady(DefaultRangeSize, DefaultThreadsCount); } - + Y_UNIT_TEST(AsyncRunOneTaskAndWaitFuturesReady) { AsyncRunAndWaitFuturesReady(1, DefaultThreadsCount); } - + Y_UNIT_TEST(AsyncRunRangeAndWaitFuturesReadyOneExtraThread) { AsyncRunAndWaitFuturesReady(DefaultRangeSize, 1); } - + Y_UNIT_TEST(AsyncRunOneThreadAndWaitFuturesReadyOneExtraThread) { AsyncRunAndWaitFuturesReady(1, 1); } - + Y_UNIT_TEST(AsyncRunTwoRangesAndWaitFuturesReady) { TLocalExecutor localExecutor; localExecutor.RunAdditionalThreads(DefaultThreadsCount); @@ -83,11 +83,11 @@ Y_UNIT_TEST(AsyncRunTwoRangesAndWaitFuturesReady) { for (int i = 0; i < DefaultRangeSize; ++i) { futures1[i].GetValueSync(); futures2[i].GetValueSync(); - } + } UNIT_ASSERT(AllOf(data1, 1)); UNIT_ASSERT(AllOf(data2, 2)); } - + void AsyncRunRangeAndWaitExceptions(int rangeSize, int threadsCount) { TLocalExecutor localExecutor; localExecutor.RunAdditionalThreads(threadsCount); @@ -111,29 +111,29 @@ void AsyncRunRangeAndWaitExceptions(int rangeSize, int threadsCount) { } catch (int& e) { if (e == 10000 + i) { ++exceptionsCaught; - } - } - } + } + } + } UNIT_ASSERT(exceptionsCaught == rangeSize); UNIT_ASSERT(AllOf(data, 1)); } - + Y_UNIT_TEST(AsyncRunRangeAndWaitExceptions) { AsyncRunRangeAndWaitExceptions(DefaultRangeSize, DefaultThreadsCount); } - + Y_UNIT_TEST(AsyncRunOneTaskAndWaitExceptions) { AsyncRunRangeAndWaitExceptions(1, DefaultThreadsCount); } - + Y_UNIT_TEST(AsyncRunRangeAndWaitExceptionsOneExtraThread) { AsyncRunRangeAndWaitExceptions(DefaultRangeSize, 1); } - + Y_UNIT_TEST(AsyncRunOneTaskAndWaitExceptionsOneExtraThread) { AsyncRunRangeAndWaitExceptions(1, 1); } - + Y_UNIT_TEST(AsyncRunTwoRangesAndWaitExceptions) { TLocalExecutor localExecutor; localExecutor.RunAdditionalThreads(DefaultThreadsCount); @@ -156,7 +156,7 @@ Y_UNIT_TEST(AsyncRunTwoRangesAndWaitExceptions) { throw 16000 + i; }, 0, DefaultRangeSize, TLocalExecutor::HIGH_PRIORITY); - + UNIT_ASSERT(AllOf(data1, 0)); UNIT_ASSERT(AllOf(data2, 0)); UNIT_ASSERT(futures1.size() == DefaultRangeSize); @@ -169,21 +169,21 @@ Y_UNIT_TEST(AsyncRunTwoRangesAndWaitExceptions) { } catch (int& e) { if (e == 15000 + i) { ++exceptionsCaught; - } + } } try { futures2[i].GetValueSync(); } catch (int& e) { if (e == 16000 + i) { ++exceptionsCaught; - } - } - } + } + } + } UNIT_ASSERT(exceptionsCaught == 2 * DefaultRangeSize); UNIT_ASSERT(AllOf(data1, 1)); UNIT_ASSERT(AllOf(data2, 2)); } - + void RunRangeAndCheckExceptionsWithWaitComplete(int rangeSize, int threadsCount) { TLocalExecutor localExecutor; localExecutor.RunAdditionalThreads(threadsCount); @@ -202,39 +202,39 @@ void RunRangeAndCheckExceptionsWithWaitComplete(int rangeSize, int threadsCount) } catch (int& e) { if (e == 30000 + i) { ++exceptionsCaught; - } - } - } + } + } + } UNIT_ASSERT(exceptionsCaught == rangeSize); UNIT_ASSERT(AllOf(data, 1)); } - + Y_UNIT_TEST(RunRangeAndCheckExceptionsWithWaitComplete) { RunRangeAndCheckExceptionsWithWaitComplete(DefaultRangeSize, DefaultThreadsCount); } - + Y_UNIT_TEST(RunOneAndCheckExceptionsWithWaitComplete) { RunRangeAndCheckExceptionsWithWaitComplete(1, DefaultThreadsCount); } - + Y_UNIT_TEST(RunRangeAndCheckExceptionsWithWaitCompleteOneExtraThread) { RunRangeAndCheckExceptionsWithWaitComplete(DefaultRangeSize, 1); } - + Y_UNIT_TEST(RunOneAndCheckExceptionsWithWaitCompleteOneExtraThread) { RunRangeAndCheckExceptionsWithWaitComplete(1, 1); } - + Y_UNIT_TEST(RunRangeAndCheckExceptionsWithWaitCompleteZeroExtraThreads) { RunRangeAndCheckExceptionsWithWaitComplete(DefaultRangeSize, 0); } - + Y_UNIT_TEST(RunOneAndCheckExceptionsWithWaitCompleteZeroExtraThreads) { RunRangeAndCheckExceptionsWithWaitComplete(1, 0); } } ; - + Y_UNIT_TEST_SUITE(ExecRangeWithThrow){ void RunParallelWhichThrowsTTestException(int rangeStart, int rangeSize, int threadsCount, int flags, TAtomic& processed){ AtomicSet(processed, 0); @@ -246,7 +246,7 @@ localExecutor.ExecRangeWithThrow([&processed](int) { }, rangeStart, rangeStart + rangeSize, flags); } - + Y_UNIT_TEST(RunParallelWhichThrowsTTestException) { TAtomic processed = 0; UNIT_ASSERT_EXCEPTION( @@ -255,7 +255,7 @@ Y_UNIT_TEST(RunParallelWhichThrowsTTestException) { TTestException); UNIT_ASSERT(AtomicGet(processed) == 40); } - + void ThrowAndCatchTTestException(int rangeSize, int threadsCount, int flags) { TAtomic processed = 0; UNIT_ASSERT_EXCEPTION( @@ -263,44 +263,44 @@ void ThrowAndCatchTTestException(int rangeSize, int threadsCount, int flags) { TTestException); UNIT_ASSERT(AtomicGet(processed) == rangeSize); } - + Y_UNIT_TEST(ThrowAndCatchTTestExceptionLowPriority) { ThrowAndCatchTTestException(DefaultRangeSize, DefaultThreadsCount, TLocalExecutor::EFlags::WAIT_COMPLETE | TLocalExecutor::EFlags::LOW_PRIORITY); } - + Y_UNIT_TEST(ThrowAndCatchTTestExceptionMedPriority) { ThrowAndCatchTTestException(DefaultRangeSize, DefaultThreadsCount, TLocalExecutor::EFlags::WAIT_COMPLETE | TLocalExecutor::EFlags::MED_PRIORITY); } - + Y_UNIT_TEST(ThrowAndCatchTTestExceptionHighPriority) { ThrowAndCatchTTestException(DefaultRangeSize, DefaultThreadsCount, TLocalExecutor::EFlags::WAIT_COMPLETE | TLocalExecutor::EFlags::HIGH_PRIORITY); } - + Y_UNIT_TEST(ThrowAndCatchTTestExceptionWaitComplete) { ThrowAndCatchTTestException(DefaultRangeSize, DefaultThreadsCount, TLocalExecutor::EFlags::WAIT_COMPLETE); } - + Y_UNIT_TEST(RethrowExeptionSequentialWaitComplete) { ThrowAndCatchTTestException(DefaultRangeSize, 0, TLocalExecutor::EFlags::WAIT_COMPLETE); } - + Y_UNIT_TEST(RethrowExeptionOneExtraThreadWaitComplete) { ThrowAndCatchTTestException(DefaultRangeSize, 1, TLocalExecutor::EFlags::WAIT_COMPLETE); } - + void ThrowsTTestExceptionFromNested(TLocalExecutor& localExecutor) { localExecutor.ExecRangeWithThrow([](int) { throw TTestException(); }, 0, 10, TLocalExecutor::EFlags::WAIT_COMPLETE); } - + void CatchTTestExceptionFromNested(TAtomic& processed1, TAtomic& processed2) { TLocalExecutor localExecutor; localExecutor.RunAdditionalThreads(DefaultThreadsCount); @@ -313,7 +313,7 @@ void CatchTTestExceptionFromNested(TAtomic& processed1, TAtomic& processed2) { }, 0, DefaultRangeSize, TLocalExecutor::EFlags::WAIT_COMPLETE); } - + Y_UNIT_TEST(NestedParallelExceptionsDoNotLeak) { TAtomic processed1 = 0; TAtomic processed2 = 0; diff --git a/library/cpp/threading/local_executor/ut/ya.make b/library/cpp/threading/local_executor/ut/ya.make index be579a5ca0..645ce823e9 100644 --- a/library/cpp/threading/local_executor/ut/ya.make +++ b/library/cpp/threading/local_executor/ut/ya.make @@ -1,12 +1,12 @@ -OWNER( - g:matrixnet +OWNER( + g:matrixnet gulin ) - + UNITTEST_FOR(library/cpp/threading/local_executor) -SRCS( - local_executor_ut.cpp -) - -END() +SRCS( + local_executor_ut.cpp +) + +END() |