diff options
author | Dmitry Baksheev <dbakshee@yandex.ru> | 2022-02-10 16:48:12 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:48:12 +0300 |
commit | 0e38f1d675a0b3d02016acf698e8d04c0b224047 (patch) | |
tree | b222e5ac2e2e98872661c51ccceee5da0d291e13 /library | |
parent | 17fe552c0aa936de030b2b72934d9688ab9bb1c6 (diff) | |
download | ydb-0e38f1d675a0b3d02016acf698e8d04c0b224047.tar.gz |
Restoring authorship annotation for Dmitry Baksheev <dbakshee@yandex.ru>. Commit 2 of 2.
Diffstat (limited to 'library')
-rw-r--r-- | library/cpp/lfalloc/lf_allocX64.h | 6 | ||||
-rw-r--r-- | library/cpp/threading/local_executor/local_executor.h | 66 | ||||
-rw-r--r-- | library/cpp/threading/local_executor/ut/local_executor_ut.cpp | 90 |
3 files changed, 81 insertions, 81 deletions
diff --git a/library/cpp/lfalloc/lf_allocX64.h b/library/cpp/lfalloc/lf_allocX64.h index c0ed5ec169..fd2a906d6f 100644 --- a/library/cpp/lfalloc/lf_allocX64.h +++ b/library/cpp/lfalloc/lf_allocX64.h @@ -135,9 +135,9 @@ static bool EnableDefrag = true; template <class T> inline T* DoCas(T* volatile* target, T* exchange, T* compare) { -#if defined(__has_builtin) && __has_builtin(__sync_val_compare_and_swap) - return __sync_val_compare_and_swap(target, compare, exchange); -#elif defined(_WIN32) +#if defined(__has_builtin) && __has_builtin(__sync_val_compare_and_swap) + return __sync_val_compare_and_swap(target, compare, exchange); +#elif defined(_WIN32) #ifdef _64_ return (T*)_InterlockedCompareExchange64((__int64*)target, (__int64)exchange, (__int64)compare); #else diff --git a/library/cpp/threading/local_executor/local_executor.h b/library/cpp/threading/local_executor/local_executor.h index dd75382ba1..c1c824f67c 100644 --- a/library/cpp/threading/local_executor/local_executor.h +++ b/library/cpp/threading/local_executor/local_executor.h @@ -21,11 +21,11 @@ namespace NPar { virtual void LocalExec(int id) = 0; }; - // Alternative and simpler way of describing a job for executor. Function argument has the - // same meaning as `id` in `ILocallyExecutable::LocalExec`. - // - using TLocallyExecutableFunction = std::function<void(int)>; - + // Alternative and simpler way of describing a job for executor. Function argument has the + // same meaning as `id` in `ILocallyExecutable::LocalExec`. + // + using TLocallyExecutableFunction = std::function<void(int)>; + class ILocalExecutor: public TNonCopyable { public: ILocalExecutor() = default; @@ -157,34 +157,34 @@ namespace NPar { ExecRange(BlockedLoopBody(params, body), 0, params.GetBlockCount(), flags); } - template <typename TBody> - inline void ExecRangeBlockedWithThrow(TBody&& body, int firstId, int lastId, int batchSizeOrZeroForAutoBatchSize, int flags) { - if (firstId >= lastId) { - return; - } - const int threadCount = Max(GetThreadCount(), 1); - const int batchSize = batchSizeOrZeroForAutoBatchSize - ? batchSizeOrZeroForAutoBatchSize - : (lastId - firstId + threadCount - 1) / threadCount; - const int batchCount = (lastId - firstId + batchSize - 1) / batchSize; - const int batchCountPerThread = (batchCount + threadCount - 1) / threadCount; - auto states = ExecRangeWithFutures( - [=](int threadId) { - for (int batchIdPerThread = 0; batchIdPerThread < batchCountPerThread; ++batchIdPerThread) { - int batchId = batchIdPerThread * threadCount + threadId; - int begin = firstId + batchId * batchSize; - int end = Min(begin + batchSize, lastId); - for (int i = begin; i < end; ++i) { - body(i); - } - } - }, - 0, threadCount, flags); - for (auto& state: states) { - state.GetValueSync(); // Re-throw exception if any. - } - } - + template <typename TBody> + inline void ExecRangeBlockedWithThrow(TBody&& body, int firstId, int lastId, int batchSizeOrZeroForAutoBatchSize, int flags) { + if (firstId >= lastId) { + return; + } + const int threadCount = Max(GetThreadCount(), 1); + const int batchSize = batchSizeOrZeroForAutoBatchSize + ? batchSizeOrZeroForAutoBatchSize + : (lastId - firstId + threadCount - 1) / threadCount; + const int batchCount = (lastId - firstId + batchSize - 1) / batchSize; + const int batchCountPerThread = (batchCount + threadCount - 1) / threadCount; + auto states = ExecRangeWithFutures( + [=](int threadId) { + for (int batchIdPerThread = 0; batchIdPerThread < batchCountPerThread; ++batchIdPerThread) { + int batchId = batchIdPerThread * threadCount + threadId; + int begin = firstId + batchId * batchSize; + int end = Min(begin + batchSize, lastId); + for (int i = begin; i < end; ++i) { + body(i); + } + } + }, + 0, threadCount, flags); + for (auto& state: states) { + state.GetValueSync(); // Re-throw exception if any. + } + } + template <typename TBody> static inline bool TryExecRangeSequentially(TBody&& body, int firstId, int lastId, int flags) { if (lastId == firstId) { diff --git a/library/cpp/threading/local_executor/ut/local_executor_ut.cpp b/library/cpp/threading/local_executor/ut/local_executor_ut.cpp index a169edc2ec..ac5737717c 100644 --- a/library/cpp/threading/local_executor/ut/local_executor_ut.cpp +++ b/library/cpp/threading/local_executor/ut/local_executor_ut.cpp @@ -324,48 +324,48 @@ Y_UNIT_TEST(NestedParallelExceptionsDoNotLeak) { } } ; - -Y_UNIT_TEST_SUITE(ExecLargeRangeWithThrow){ - - constexpr int LARGE_COUNT = 128 * (1 << 20); - - static auto IsValue(char v) { - return [=](char c) { return c == v; }; - } - - Y_UNIT_TEST(ExecLargeRangeNoExceptions) { - TVector<char> tasks(LARGE_COUNT); - - TLocalExecutor localExecutor; - localExecutor.RunAdditionalThreads(DefaultThreadsCount); - - localExecutor.ExecRangeBlockedWithThrow([&tasks](int i) { - tasks[i] = 1; - }, 0, tasks.size(), 0, TLocalExecutor::EFlags::WAIT_COMPLETE); - UNIT_ASSERT(AllOf(tasks, IsValue(1))); - - - localExecutor.ExecRangeBlockedWithThrow([&tasks](int i) { - tasks[i] += 1; - }, 0, tasks.size(), 128, TLocalExecutor::EFlags::WAIT_COMPLETE); - UNIT_ASSERT(AllOf(tasks, IsValue(2))); - } - - Y_UNIT_TEST(ExecLargeRangeWithException) { - TVector<char> tasks(LARGE_COUNT); - - TLocalExecutor localExecutor; - localExecutor.RunAdditionalThreads(DefaultThreadsCount); - - Fill(tasks.begin(), tasks.end(), 0); - UNIT_ASSERT_EXCEPTION( - localExecutor.ExecRangeBlockedWithThrow([&tasks](int i) { - tasks[i] += 1; - if (i == LARGE_COUNT / 2) { - throw TTestException(); - } - }, 0, tasks.size(), 0, TLocalExecutor::EFlags::WAIT_COMPLETE), - TTestException - ); - } -}; + +Y_UNIT_TEST_SUITE(ExecLargeRangeWithThrow){ + + constexpr int LARGE_COUNT = 128 * (1 << 20); + + static auto IsValue(char v) { + return [=](char c) { return c == v; }; + } + + Y_UNIT_TEST(ExecLargeRangeNoExceptions) { + TVector<char> tasks(LARGE_COUNT); + + TLocalExecutor localExecutor; + localExecutor.RunAdditionalThreads(DefaultThreadsCount); + + localExecutor.ExecRangeBlockedWithThrow([&tasks](int i) { + tasks[i] = 1; + }, 0, tasks.size(), 0, TLocalExecutor::EFlags::WAIT_COMPLETE); + UNIT_ASSERT(AllOf(tasks, IsValue(1))); + + + localExecutor.ExecRangeBlockedWithThrow([&tasks](int i) { + tasks[i] += 1; + }, 0, tasks.size(), 128, TLocalExecutor::EFlags::WAIT_COMPLETE); + UNIT_ASSERT(AllOf(tasks, IsValue(2))); + } + + Y_UNIT_TEST(ExecLargeRangeWithException) { + TVector<char> tasks(LARGE_COUNT); + + TLocalExecutor localExecutor; + localExecutor.RunAdditionalThreads(DefaultThreadsCount); + + Fill(tasks.begin(), tasks.end(), 0); + UNIT_ASSERT_EXCEPTION( + localExecutor.ExecRangeBlockedWithThrow([&tasks](int i) { + tasks[i] += 1; + if (i == LARGE_COUNT / 2) { + throw TTestException(); + } + }, 0, tasks.size(), 0, TLocalExecutor::EFlags::WAIT_COMPLETE), + TTestException + ); + } +}; |