aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/threading/local_executor/ut
diff options
context:
space:
mode:
authorDevtools Arcadia <arcadia-devtools@yandex-team.ru>2022-02-07 18:08:42 +0300
committerDevtools Arcadia <arcadia-devtools@mous.vla.yp-c.yandex.net>2022-02-07 18:08:42 +0300
commit1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch)
treee26c9fed0de5d9873cce7e00bc214573dc2195b7 /library/cpp/threading/local_executor/ut
downloadydb-1110808a9d39d4b808aef724c861a2e1a38d2a69.tar.gz
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'library/cpp/threading/local_executor/ut')
-rw-r--r--library/cpp/threading/local_executor/ut/local_executor_ut.cpp371
-rw-r--r--library/cpp/threading/local_executor/ut/ya.make12
2 files changed, 383 insertions, 0 deletions
diff --git a/library/cpp/threading/local_executor/ut/local_executor_ut.cpp b/library/cpp/threading/local_executor/ut/local_executor_ut.cpp
new file mode 100644
index 0000000000..ac5737717c
--- /dev/null
+++ b/library/cpp/threading/local_executor/ut/local_executor_ut.cpp
@@ -0,0 +1,371 @@
+#include <library/cpp/threading/local_executor/local_executor.h>
+#include <library/cpp/threading/future/future.h>
+
+#include <library/cpp/testing/unittest/registar.h>
+#include <util/system/mutex.h>
+#include <util/system/rwlock.h>
+#include <util/generic/algorithm.h>
+
+using namespace NPar;
+
+class TTestException: public yexception {
+};
+
+static const int DefaultThreadsCount = 41;
+static const int DefaultRangeSize = 999;
+
+Y_UNIT_TEST_SUITE(ExecRangeWithFutures){
+ bool AllOf(const TVector<int>& vec, int value){
+ return AllOf(vec, [value](int element) { return value == element; });
+}
+
+void AsyncRunAndWaitFuturesReady(int rangeSize, int threads) {
+ TLocalExecutor localExecutor;
+ localExecutor.RunAdditionalThreads(threads);
+ TAtomic signal = 0;
+ TVector<int> data(rangeSize, 0);
+ TVector<NThreading::TFuture<void>> futures = localExecutor.ExecRangeWithFutures([&signal, &data](int i) {
+ UNIT_ASSERT(data[i] == 0);
+ while (AtomicGet(signal) == 0)
+ ;
+ data[i] += 1;
+ },
+ 0, rangeSize, TLocalExecutor::HIGH_PRIORITY);
+ UNIT_ASSERT(AllOf(data, 0));
+ for (auto& future : futures)
+ UNIT_ASSERT(!future.HasValue());
+ AtomicSet(signal, 1);
+ for (auto& future : futures) {
+ future.GetValueSync();
+ }
+ UNIT_ASSERT(AllOf(data, 1));
+}
+
+Y_UNIT_TEST(AsyncRunRangeAndWaitFuturesReady) {
+ AsyncRunAndWaitFuturesReady(DefaultRangeSize, DefaultThreadsCount);
+}
+
+Y_UNIT_TEST(AsyncRunOneTaskAndWaitFuturesReady) {
+ AsyncRunAndWaitFuturesReady(1, DefaultThreadsCount);
+}
+
+Y_UNIT_TEST(AsyncRunRangeAndWaitFuturesReadyOneExtraThread) {
+ AsyncRunAndWaitFuturesReady(DefaultRangeSize, 1);
+}
+
+Y_UNIT_TEST(AsyncRunOneThreadAndWaitFuturesReadyOneExtraThread) {
+ AsyncRunAndWaitFuturesReady(1, 1);
+}
+
+Y_UNIT_TEST(AsyncRunTwoRangesAndWaitFuturesReady) {
+ TLocalExecutor localExecutor;
+ localExecutor.RunAdditionalThreads(DefaultThreadsCount);
+ TAtomic signal = 0;
+ TVector<int> data1(DefaultRangeSize, 0);
+ TVector<NThreading::TFuture<void>> futures1 = localExecutor.ExecRangeWithFutures([&signal, &data1](int i) {
+ UNIT_ASSERT(data1[i] == 0);
+ while (AtomicGet(signal) == 0)
+ ;
+ data1[i] += 1;
+ },
+ 0, DefaultRangeSize, TLocalExecutor::HIGH_PRIORITY);
+ TVector<int> data2(DefaultRangeSize, 0);
+ TVector<NThreading::TFuture<void>> futures2 = localExecutor.ExecRangeWithFutures([&signal, &data2](int i) {
+ UNIT_ASSERT(data2[i] == 0);
+ while (AtomicGet(signal) == 0)
+ ;
+ data2[i] += 2;
+ },
+ 0, DefaultRangeSize, TLocalExecutor::HIGH_PRIORITY);
+ UNIT_ASSERT(AllOf(data1, 0));
+ UNIT_ASSERT(AllOf(data2, 0));
+ AtomicSet(signal, 1);
+ for (int i = 0; i < DefaultRangeSize; ++i) {
+ futures1[i].GetValueSync();
+ futures2[i].GetValueSync();
+ }
+ UNIT_ASSERT(AllOf(data1, 1));
+ UNIT_ASSERT(AllOf(data2, 2));
+}
+
+void AsyncRunRangeAndWaitExceptions(int rangeSize, int threadsCount) {
+ TLocalExecutor localExecutor;
+ localExecutor.RunAdditionalThreads(threadsCount);
+ TAtomic signal = 0;
+ TVector<int> data(rangeSize, 0);
+ TVector<NThreading::TFuture<void>> futures = localExecutor.ExecRangeWithFutures([&signal, &data](int i) {
+ UNIT_ASSERT(data[i] == 0);
+ while (AtomicGet(signal) == 0)
+ ;
+ data[i] += 1;
+ throw 10000 + i;
+ },
+ 0, rangeSize, TLocalExecutor::HIGH_PRIORITY);
+ UNIT_ASSERT(AllOf(data, 0));
+ UNIT_ASSERT(futures.ysize() == rangeSize);
+ AtomicSet(signal, 1);
+ int exceptionsCaught = 0;
+ for (int i = 0; i < rangeSize; ++i) {
+ try {
+ futures[i].GetValueSync();
+ } catch (int& e) {
+ if (e == 10000 + i) {
+ ++exceptionsCaught;
+ }
+ }
+ }
+ UNIT_ASSERT(exceptionsCaught == rangeSize);
+ UNIT_ASSERT(AllOf(data, 1));
+}
+
+Y_UNIT_TEST(AsyncRunRangeAndWaitExceptions) {
+ AsyncRunRangeAndWaitExceptions(DefaultRangeSize, DefaultThreadsCount);
+}
+
+Y_UNIT_TEST(AsyncRunOneTaskAndWaitExceptions) {
+ AsyncRunRangeAndWaitExceptions(1, DefaultThreadsCount);
+}
+
+Y_UNIT_TEST(AsyncRunRangeAndWaitExceptionsOneExtraThread) {
+ AsyncRunRangeAndWaitExceptions(DefaultRangeSize, 1);
+}
+
+Y_UNIT_TEST(AsyncRunOneTaskAndWaitExceptionsOneExtraThread) {
+ AsyncRunRangeAndWaitExceptions(1, 1);
+}
+
+Y_UNIT_TEST(AsyncRunTwoRangesAndWaitExceptions) {
+ TLocalExecutor localExecutor;
+ localExecutor.RunAdditionalThreads(DefaultThreadsCount);
+ TAtomic signal = 0;
+ TVector<int> data1(DefaultRangeSize, 0);
+ TVector<NThreading::TFuture<void>> futures1 = localExecutor.ExecRangeWithFutures([&signal, &data1](int i) {
+ UNIT_ASSERT(data1[i] == 0);
+ while (AtomicGet(signal) == 0)
+ ;
+ data1[i] += 1;
+ throw 15000 + i;
+ },
+ 0, DefaultRangeSize, TLocalExecutor::LOW_PRIORITY);
+ TVector<int> data2(DefaultRangeSize, 0);
+ TVector<NThreading::TFuture<void>> futures2 = localExecutor.ExecRangeWithFutures([&signal, &data2](int i) {
+ UNIT_ASSERT(data2[i] == 0);
+ while (AtomicGet(signal) == 0)
+ ;
+ data2[i] += 2;
+ throw 16000 + i;
+ },
+ 0, DefaultRangeSize, TLocalExecutor::HIGH_PRIORITY);
+
+ UNIT_ASSERT(AllOf(data1, 0));
+ UNIT_ASSERT(AllOf(data2, 0));
+ UNIT_ASSERT(futures1.size() == DefaultRangeSize);
+ UNIT_ASSERT(futures2.size() == DefaultRangeSize);
+ AtomicSet(signal, 1);
+ int exceptionsCaught = 0;
+ for (int i = 0; i < DefaultRangeSize; ++i) {
+ try {
+ futures1[i].GetValueSync();
+ } catch (int& e) {
+ if (e == 15000 + i) {
+ ++exceptionsCaught;
+ }
+ }
+ try {
+ futures2[i].GetValueSync();
+ } catch (int& e) {
+ if (e == 16000 + i) {
+ ++exceptionsCaught;
+ }
+ }
+ }
+ UNIT_ASSERT(exceptionsCaught == 2 * DefaultRangeSize);
+ UNIT_ASSERT(AllOf(data1, 1));
+ UNIT_ASSERT(AllOf(data2, 2));
+}
+
+void RunRangeAndCheckExceptionsWithWaitComplete(int rangeSize, int threadsCount) {
+ TLocalExecutor localExecutor;
+ localExecutor.RunAdditionalThreads(threadsCount);
+ TVector<int> data(rangeSize, 0);
+ TVector<NThreading::TFuture<void>> futures = localExecutor.ExecRangeWithFutures([&data](int i) {
+ UNIT_ASSERT(data[i] == 0);
+ data[i] += 1;
+ throw 30000 + i;
+ },
+ 0, rangeSize, TLocalExecutor::EFlags::WAIT_COMPLETE);
+ UNIT_ASSERT(AllOf(data, 1));
+ int exceptionsCaught = 0;
+ for (int i = 0; i < rangeSize; ++i) {
+ try {
+ futures[i].GetValueSync();
+ } catch (int& e) {
+ if (e == 30000 + i) {
+ ++exceptionsCaught;
+ }
+ }
+ }
+ UNIT_ASSERT(exceptionsCaught == rangeSize);
+ UNIT_ASSERT(AllOf(data, 1));
+}
+
+Y_UNIT_TEST(RunRangeAndCheckExceptionsWithWaitComplete) {
+ RunRangeAndCheckExceptionsWithWaitComplete(DefaultRangeSize, DefaultThreadsCount);
+}
+
+Y_UNIT_TEST(RunOneAndCheckExceptionsWithWaitComplete) {
+ RunRangeAndCheckExceptionsWithWaitComplete(1, DefaultThreadsCount);
+}
+
+Y_UNIT_TEST(RunRangeAndCheckExceptionsWithWaitCompleteOneExtraThread) {
+ RunRangeAndCheckExceptionsWithWaitComplete(DefaultRangeSize, 1);
+}
+
+Y_UNIT_TEST(RunOneAndCheckExceptionsWithWaitCompleteOneExtraThread) {
+ RunRangeAndCheckExceptionsWithWaitComplete(1, 1);
+}
+
+Y_UNIT_TEST(RunRangeAndCheckExceptionsWithWaitCompleteZeroExtraThreads) {
+ RunRangeAndCheckExceptionsWithWaitComplete(DefaultRangeSize, 0);
+}
+
+Y_UNIT_TEST(RunOneAndCheckExceptionsWithWaitCompleteZeroExtraThreads) {
+ RunRangeAndCheckExceptionsWithWaitComplete(1, 0);
+}
+}
+;
+
+Y_UNIT_TEST_SUITE(ExecRangeWithThrow){
+ void RunParallelWhichThrowsTTestException(int rangeStart, int rangeSize, int threadsCount, int flags, TAtomic& processed){
+ AtomicSet(processed, 0);
+TLocalExecutor localExecutor;
+localExecutor.RunAdditionalThreads(threadsCount);
+localExecutor.ExecRangeWithThrow([&processed](int) {
+ AtomicAdd(processed, 1);
+ throw TTestException();
+},
+ rangeStart, rangeStart + rangeSize, flags);
+}
+
+Y_UNIT_TEST(RunParallelWhichThrowsTTestException) {
+ TAtomic processed = 0;
+ UNIT_ASSERT_EXCEPTION(
+ RunParallelWhichThrowsTTestException(10, 40, DefaultThreadsCount,
+ TLocalExecutor::EFlags::WAIT_COMPLETE, processed),
+ TTestException);
+ UNIT_ASSERT(AtomicGet(processed) == 40);
+}
+
+void ThrowAndCatchTTestException(int rangeSize, int threadsCount, int flags) {
+ TAtomic processed = 0;
+ UNIT_ASSERT_EXCEPTION(
+ RunParallelWhichThrowsTTestException(0, rangeSize, threadsCount, flags, processed),
+ TTestException);
+ UNIT_ASSERT(AtomicGet(processed) == rangeSize);
+}
+
+Y_UNIT_TEST(ThrowAndCatchTTestExceptionLowPriority) {
+ ThrowAndCatchTTestException(DefaultRangeSize, DefaultThreadsCount,
+ TLocalExecutor::EFlags::WAIT_COMPLETE | TLocalExecutor::EFlags::LOW_PRIORITY);
+}
+
+Y_UNIT_TEST(ThrowAndCatchTTestExceptionMedPriority) {
+ ThrowAndCatchTTestException(DefaultRangeSize, DefaultThreadsCount,
+ TLocalExecutor::EFlags::WAIT_COMPLETE | TLocalExecutor::EFlags::MED_PRIORITY);
+}
+
+Y_UNIT_TEST(ThrowAndCatchTTestExceptionHighPriority) {
+ ThrowAndCatchTTestException(DefaultRangeSize, DefaultThreadsCount,
+ TLocalExecutor::EFlags::WAIT_COMPLETE | TLocalExecutor::EFlags::HIGH_PRIORITY);
+}
+
+Y_UNIT_TEST(ThrowAndCatchTTestExceptionWaitComplete) {
+ ThrowAndCatchTTestException(DefaultRangeSize, DefaultThreadsCount,
+ TLocalExecutor::EFlags::WAIT_COMPLETE);
+}
+
+Y_UNIT_TEST(RethrowExeptionSequentialWaitComplete) {
+ ThrowAndCatchTTestException(DefaultRangeSize, 0,
+ TLocalExecutor::EFlags::WAIT_COMPLETE);
+}
+
+Y_UNIT_TEST(RethrowExeptionOneExtraThreadWaitComplete) {
+ ThrowAndCatchTTestException(DefaultRangeSize, 1,
+ TLocalExecutor::EFlags::WAIT_COMPLETE);
+}
+
+void ThrowsTTestExceptionFromNested(TLocalExecutor& localExecutor) {
+ localExecutor.ExecRangeWithThrow([](int) {
+ throw TTestException();
+ },
+ 0, 10, TLocalExecutor::EFlags::WAIT_COMPLETE);
+}
+
+void CatchTTestExceptionFromNested(TAtomic& processed1, TAtomic& processed2) {
+ TLocalExecutor localExecutor;
+ localExecutor.RunAdditionalThreads(DefaultThreadsCount);
+ localExecutor.ExecRangeWithThrow([&processed1, &processed2, &localExecutor](int) {
+ AtomicAdd(processed1, 1);
+ UNIT_ASSERT_EXCEPTION(
+ ThrowsTTestExceptionFromNested(localExecutor),
+ TTestException);
+ AtomicAdd(processed2, 1);
+ },
+ 0, DefaultRangeSize, TLocalExecutor::EFlags::WAIT_COMPLETE);
+}
+
+Y_UNIT_TEST(NestedParallelExceptionsDoNotLeak) {
+ TAtomic processed1 = 0;
+ TAtomic processed2 = 0;
+ UNIT_ASSERT_NO_EXCEPTION(
+ CatchTTestExceptionFromNested(processed1, processed2));
+ UNIT_ASSERT_EQUAL(AtomicGet(processed1), DefaultRangeSize);
+ UNIT_ASSERT_EQUAL(AtomicGet(processed2), DefaultRangeSize);
+}
+}
+;
+
+Y_UNIT_TEST_SUITE(ExecLargeRangeWithThrow){
+
+ constexpr int LARGE_COUNT = 128 * (1 << 20);
+
+ static auto IsValue(char v) {
+ return [=](char c) { return c == v; };
+ }
+
+ Y_UNIT_TEST(ExecLargeRangeNoExceptions) {
+ TVector<char> tasks(LARGE_COUNT);
+
+ TLocalExecutor localExecutor;
+ localExecutor.RunAdditionalThreads(DefaultThreadsCount);
+
+ localExecutor.ExecRangeBlockedWithThrow([&tasks](int i) {
+ tasks[i] = 1;
+ }, 0, tasks.size(), 0, TLocalExecutor::EFlags::WAIT_COMPLETE);
+ UNIT_ASSERT(AllOf(tasks, IsValue(1)));
+
+
+ localExecutor.ExecRangeBlockedWithThrow([&tasks](int i) {
+ tasks[i] += 1;
+ }, 0, tasks.size(), 128, TLocalExecutor::EFlags::WAIT_COMPLETE);
+ UNIT_ASSERT(AllOf(tasks, IsValue(2)));
+ }
+
+ Y_UNIT_TEST(ExecLargeRangeWithException) {
+ TVector<char> tasks(LARGE_COUNT);
+
+ TLocalExecutor localExecutor;
+ localExecutor.RunAdditionalThreads(DefaultThreadsCount);
+
+ Fill(tasks.begin(), tasks.end(), 0);
+ UNIT_ASSERT_EXCEPTION(
+ localExecutor.ExecRangeBlockedWithThrow([&tasks](int i) {
+ tasks[i] += 1;
+ if (i == LARGE_COUNT / 2) {
+ throw TTestException();
+ }
+ }, 0, tasks.size(), 0, TLocalExecutor::EFlags::WAIT_COMPLETE),
+ TTestException
+ );
+ }
+};
diff --git a/library/cpp/threading/local_executor/ut/ya.make b/library/cpp/threading/local_executor/ut/ya.make
new file mode 100644
index 0000000000..be579a5ca0
--- /dev/null
+++ b/library/cpp/threading/local_executor/ut/ya.make
@@ -0,0 +1,12 @@
+OWNER(
+ g:matrixnet
+ gulin
+)
+
+UNITTEST_FOR(library/cpp/threading/local_executor)
+
+SRCS(
+ local_executor_ut.cpp
+)
+
+END()