aboutsummaryrefslogtreecommitdiffstats
path: root/library
diff options
context:
space:
mode:
authorDmitry Baksheev <dbakshee@yandex.ru>2022-02-10 16:48:12 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:48:12 +0300
commit0e38f1d675a0b3d02016acf698e8d04c0b224047 (patch)
treeb222e5ac2e2e98872661c51ccceee5da0d291e13 /library
parent17fe552c0aa936de030b2b72934d9688ab9bb1c6 (diff)
downloadydb-0e38f1d675a0b3d02016acf698e8d04c0b224047.tar.gz
Restoring authorship annotation for Dmitry Baksheev <dbakshee@yandex.ru>. Commit 2 of 2.
Diffstat (limited to 'library')
-rw-r--r--library/cpp/lfalloc/lf_allocX64.h6
-rw-r--r--library/cpp/threading/local_executor/local_executor.h66
-rw-r--r--library/cpp/threading/local_executor/ut/local_executor_ut.cpp90
3 files changed, 81 insertions, 81 deletions
diff --git a/library/cpp/lfalloc/lf_allocX64.h b/library/cpp/lfalloc/lf_allocX64.h
index c0ed5ec169..fd2a906d6f 100644
--- a/library/cpp/lfalloc/lf_allocX64.h
+++ b/library/cpp/lfalloc/lf_allocX64.h
@@ -135,9 +135,9 @@ static bool EnableDefrag = true;
template <class T>
inline T* DoCas(T* volatile* target, T* exchange, T* compare) {
-#if defined(__has_builtin) && __has_builtin(__sync_val_compare_and_swap)
- return __sync_val_compare_and_swap(target, compare, exchange);
-#elif defined(_WIN32)
+#if defined(__has_builtin) && __has_builtin(__sync_val_compare_and_swap)
+ return __sync_val_compare_and_swap(target, compare, exchange);
+#elif defined(_WIN32)
#ifdef _64_
return (T*)_InterlockedCompareExchange64((__int64*)target, (__int64)exchange, (__int64)compare);
#else
diff --git a/library/cpp/threading/local_executor/local_executor.h b/library/cpp/threading/local_executor/local_executor.h
index dd75382ba1..c1c824f67c 100644
--- a/library/cpp/threading/local_executor/local_executor.h
+++ b/library/cpp/threading/local_executor/local_executor.h
@@ -21,11 +21,11 @@ namespace NPar {
virtual void LocalExec(int id) = 0;
};
- // Alternative and simpler way of describing a job for executor. Function argument has the
- // same meaning as `id` in `ILocallyExecutable::LocalExec`.
- //
- using TLocallyExecutableFunction = std::function<void(int)>;
-
+ // Alternative and simpler way of describing a job for executor. Function argument has the
+ // same meaning as `id` in `ILocallyExecutable::LocalExec`.
+ //
+ using TLocallyExecutableFunction = std::function<void(int)>;
+
class ILocalExecutor: public TNonCopyable {
public:
ILocalExecutor() = default;
@@ -157,34 +157,34 @@ namespace NPar {
ExecRange(BlockedLoopBody(params, body), 0, params.GetBlockCount(), flags);
}
- template <typename TBody>
- inline void ExecRangeBlockedWithThrow(TBody&& body, int firstId, int lastId, int batchSizeOrZeroForAutoBatchSize, int flags) {
- if (firstId >= lastId) {
- return;
- }
- const int threadCount = Max(GetThreadCount(), 1);
- const int batchSize = batchSizeOrZeroForAutoBatchSize
- ? batchSizeOrZeroForAutoBatchSize
- : (lastId - firstId + threadCount - 1) / threadCount;
- const int batchCount = (lastId - firstId + batchSize - 1) / batchSize;
- const int batchCountPerThread = (batchCount + threadCount - 1) / threadCount;
- auto states = ExecRangeWithFutures(
- [=](int threadId) {
- for (int batchIdPerThread = 0; batchIdPerThread < batchCountPerThread; ++batchIdPerThread) {
- int batchId = batchIdPerThread * threadCount + threadId;
- int begin = firstId + batchId * batchSize;
- int end = Min(begin + batchSize, lastId);
- for (int i = begin; i < end; ++i) {
- body(i);
- }
- }
- },
- 0, threadCount, flags);
- for (auto& state: states) {
- state.GetValueSync(); // Re-throw exception if any.
- }
- }
-
+ template <typename TBody>
+ inline void ExecRangeBlockedWithThrow(TBody&& body, int firstId, int lastId, int batchSizeOrZeroForAutoBatchSize, int flags) {
+ if (firstId >= lastId) {
+ return;
+ }
+ const int threadCount = Max(GetThreadCount(), 1);
+ const int batchSize = batchSizeOrZeroForAutoBatchSize
+ ? batchSizeOrZeroForAutoBatchSize
+ : (lastId - firstId + threadCount - 1) / threadCount;
+ const int batchCount = (lastId - firstId + batchSize - 1) / batchSize;
+ const int batchCountPerThread = (batchCount + threadCount - 1) / threadCount;
+ auto states = ExecRangeWithFutures(
+ [=](int threadId) {
+ for (int batchIdPerThread = 0; batchIdPerThread < batchCountPerThread; ++batchIdPerThread) {
+ int batchId = batchIdPerThread * threadCount + threadId;
+ int begin = firstId + batchId * batchSize;
+ int end = Min(begin + batchSize, lastId);
+ for (int i = begin; i < end; ++i) {
+ body(i);
+ }
+ }
+ },
+ 0, threadCount, flags);
+ for (auto& state: states) {
+ state.GetValueSync(); // Re-throw exception if any.
+ }
+ }
+
template <typename TBody>
static inline bool TryExecRangeSequentially(TBody&& body, int firstId, int lastId, int flags) {
if (lastId == firstId) {
diff --git a/library/cpp/threading/local_executor/ut/local_executor_ut.cpp b/library/cpp/threading/local_executor/ut/local_executor_ut.cpp
index a169edc2ec..ac5737717c 100644
--- a/library/cpp/threading/local_executor/ut/local_executor_ut.cpp
+++ b/library/cpp/threading/local_executor/ut/local_executor_ut.cpp
@@ -324,48 +324,48 @@ Y_UNIT_TEST(NestedParallelExceptionsDoNotLeak) {
}
}
;
-
-Y_UNIT_TEST_SUITE(ExecLargeRangeWithThrow){
-
- constexpr int LARGE_COUNT = 128 * (1 << 20);
-
- static auto IsValue(char v) {
- return [=](char c) { return c == v; };
- }
-
- Y_UNIT_TEST(ExecLargeRangeNoExceptions) {
- TVector<char> tasks(LARGE_COUNT);
-
- TLocalExecutor localExecutor;
- localExecutor.RunAdditionalThreads(DefaultThreadsCount);
-
- localExecutor.ExecRangeBlockedWithThrow([&tasks](int i) {
- tasks[i] = 1;
- }, 0, tasks.size(), 0, TLocalExecutor::EFlags::WAIT_COMPLETE);
- UNIT_ASSERT(AllOf(tasks, IsValue(1)));
-
-
- localExecutor.ExecRangeBlockedWithThrow([&tasks](int i) {
- tasks[i] += 1;
- }, 0, tasks.size(), 128, TLocalExecutor::EFlags::WAIT_COMPLETE);
- UNIT_ASSERT(AllOf(tasks, IsValue(2)));
- }
-
- Y_UNIT_TEST(ExecLargeRangeWithException) {
- TVector<char> tasks(LARGE_COUNT);
-
- TLocalExecutor localExecutor;
- localExecutor.RunAdditionalThreads(DefaultThreadsCount);
-
- Fill(tasks.begin(), tasks.end(), 0);
- UNIT_ASSERT_EXCEPTION(
- localExecutor.ExecRangeBlockedWithThrow([&tasks](int i) {
- tasks[i] += 1;
- if (i == LARGE_COUNT / 2) {
- throw TTestException();
- }
- }, 0, tasks.size(), 0, TLocalExecutor::EFlags::WAIT_COMPLETE),
- TTestException
- );
- }
-};
+
+Y_UNIT_TEST_SUITE(ExecLargeRangeWithThrow){
+
+ constexpr int LARGE_COUNT = 128 * (1 << 20);
+
+ static auto IsValue(char v) {
+ return [=](char c) { return c == v; };
+ }
+
+ Y_UNIT_TEST(ExecLargeRangeNoExceptions) {
+ TVector<char> tasks(LARGE_COUNT);
+
+ TLocalExecutor localExecutor;
+ localExecutor.RunAdditionalThreads(DefaultThreadsCount);
+
+ localExecutor.ExecRangeBlockedWithThrow([&tasks](int i) {
+ tasks[i] = 1;
+ }, 0, tasks.size(), 0, TLocalExecutor::EFlags::WAIT_COMPLETE);
+ UNIT_ASSERT(AllOf(tasks, IsValue(1)));
+
+
+ localExecutor.ExecRangeBlockedWithThrow([&tasks](int i) {
+ tasks[i] += 1;
+ }, 0, tasks.size(), 128, TLocalExecutor::EFlags::WAIT_COMPLETE);
+ UNIT_ASSERT(AllOf(tasks, IsValue(2)));
+ }
+
+ Y_UNIT_TEST(ExecLargeRangeWithException) {
+ TVector<char> tasks(LARGE_COUNT);
+
+ TLocalExecutor localExecutor;
+ localExecutor.RunAdditionalThreads(DefaultThreadsCount);
+
+ Fill(tasks.begin(), tasks.end(), 0);
+ UNIT_ASSERT_EXCEPTION(
+ localExecutor.ExecRangeBlockedWithThrow([&tasks](int i) {
+ tasks[i] += 1;
+ if (i == LARGE_COUNT / 2) {
+ throw TTestException();
+ }
+ }, 0, tasks.size(), 0, TLocalExecutor::EFlags::WAIT_COMPLETE),
+ TTestException
+ );
+ }
+};