diff options
author | yazevnul <yazevnul@yandex-team.ru> | 2022-02-10 16:46:46 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:46:46 +0300 |
commit | 8cbc307de0221f84c80c42dcbe07d40727537e2c (patch) | |
tree | 625d5a673015d1df891e051033e9fcde5c7be4e5 /library/cpp/threading | |
parent | 30d1ef3941e0dc835be7609de5ebee66958f215a (diff) | |
download | ydb-8cbc307de0221f84c80c42dcbe07d40727537e2c.tar.gz |
Restoring authorship annotation for <yazevnul@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/threading')
25 files changed, 465 insertions, 465 deletions
diff --git a/library/cpp/threading/atomic/bool_ut.cpp b/library/cpp/threading/atomic/bool_ut.cpp index 9481f41d8d..86ea26a23d 100644 --- a/library/cpp/threading/atomic/bool_ut.cpp +++ b/library/cpp/threading/atomic/bool_ut.cpp @@ -2,8 +2,8 @@ #include <library/cpp/testing/unittest/registar.h> -Y_UNIT_TEST_SUITE(AtomicBool) { - Y_UNIT_TEST(ReadWrite) { +Y_UNIT_TEST_SUITE(AtomicBool) { + Y_UNIT_TEST(ReadWrite) { NAtomic::TBool v; UNIT_ASSERT_VALUES_EQUAL((bool)v, false); diff --git a/library/cpp/threading/chunk_queue/queue_ut.cpp b/library/cpp/threading/chunk_queue/queue_ut.cpp index 8cb36d8dd1..3913bd2545 100644 --- a/library/cpp/threading/chunk_queue/queue_ut.cpp +++ b/library/cpp/threading/chunk_queue/queue_ut.cpp @@ -7,8 +7,8 @@ namespace NThreading { //////////////////////////////////////////////////////////////////////////////// - Y_UNIT_TEST_SUITE(TOneOneQueueTest){ - Y_UNIT_TEST(ShouldBeEmptyAtStart){ + Y_UNIT_TEST_SUITE(TOneOneQueueTest){ + Y_UNIT_TEST(ShouldBeEmptyAtStart){ TOneOneQueue<int> queue; int result = 0; @@ -16,7 +16,7 @@ namespace NThreading { UNIT_ASSERT(!queue.Dequeue(result)); } -Y_UNIT_TEST(ShouldReturnEntries) { +Y_UNIT_TEST(ShouldReturnEntries) { TOneOneQueue<int> queue; queue.Enqueue(1); queue.Enqueue(2); @@ -39,7 +39,7 @@ Y_UNIT_TEST(ShouldReturnEntries) { UNIT_ASSERT(!queue.Dequeue(result)); } -Y_UNIT_TEST(ShouldStoreMultipleChunks) { +Y_UNIT_TEST(ShouldStoreMultipleChunks) { TOneOneQueue<int, 100> queue; for (int i = 0; i < 1000; ++i) { queue.Enqueue(i); @@ -57,8 +57,8 @@ Y_UNIT_TEST(ShouldStoreMultipleChunks) { //////////////////////////////////////////////////////////////////////////////// -Y_UNIT_TEST_SUITE(TManyOneQueueTest){ - Y_UNIT_TEST(ShouldBeEmptyAtStart){ +Y_UNIT_TEST_SUITE(TManyOneQueueTest){ + Y_UNIT_TEST(ShouldBeEmptyAtStart){ TManyOneQueue<int> queue; int result; @@ -66,7 +66,7 @@ UNIT_ASSERT(queue.IsEmpty()); UNIT_ASSERT(!queue.Dequeue(result)); } -Y_UNIT_TEST(ShouldReturnEntries) { +Y_UNIT_TEST(ShouldReturnEntries) { TManyOneQueue<int> queue; queue.Enqueue(1); queue.Enqueue(2); @@ -93,8 +93,8 @@ Y_UNIT_TEST(ShouldReturnEntries) { //////////////////////////////////////////////////////////////////////////////// -Y_UNIT_TEST_SUITE(TManyManyQueueTest){ - Y_UNIT_TEST(ShouldBeEmptyAtStart){ +Y_UNIT_TEST_SUITE(TManyManyQueueTest){ + Y_UNIT_TEST(ShouldBeEmptyAtStart){ TManyManyQueue<int> queue; int result = 0; @@ -102,7 +102,7 @@ UNIT_ASSERT(queue.IsEmpty()); UNIT_ASSERT(!queue.Dequeue(result)); } -Y_UNIT_TEST(ShouldReturnEntries) { +Y_UNIT_TEST(ShouldReturnEntries) { TManyManyQueue<int> queue; queue.Enqueue(1); queue.Enqueue(2); @@ -129,8 +129,8 @@ Y_UNIT_TEST(ShouldReturnEntries) { //////////////////////////////////////////////////////////////////////////////// -Y_UNIT_TEST_SUITE(TRelaxedManyOneQueueTest){ - Y_UNIT_TEST(ShouldBeEmptyAtStart){ +Y_UNIT_TEST_SUITE(TRelaxedManyOneQueueTest){ + Y_UNIT_TEST(ShouldBeEmptyAtStart){ TRelaxedManyOneQueue<int> queue; int result; @@ -138,7 +138,7 @@ UNIT_ASSERT(queue.IsEmpty()); UNIT_ASSERT(!queue.Dequeue(result)); } -Y_UNIT_TEST(ShouldReturnEntries) { +Y_UNIT_TEST(ShouldReturnEntries) { TSet<int> items = {1, 2, 3}; TRelaxedManyOneQueue<int> queue; @@ -167,8 +167,8 @@ Y_UNIT_TEST(ShouldReturnEntries) { //////////////////////////////////////////////////////////////////////////////// -Y_UNIT_TEST_SUITE(TRelaxedManyManyQueueTest){ - Y_UNIT_TEST(ShouldBeEmptyAtStart){ +Y_UNIT_TEST_SUITE(TRelaxedManyManyQueueTest){ + Y_UNIT_TEST(ShouldBeEmptyAtStart){ TRelaxedManyManyQueue<int> queue; int result = 0; @@ -176,7 +176,7 @@ UNIT_ASSERT(queue.IsEmpty()); UNIT_ASSERT(!queue.Dequeue(result)); } -Y_UNIT_TEST(ShouldReturnEntries) { +Y_UNIT_TEST(ShouldReturnEntries) { TSet<int> items = {1, 2, 3}; TRelaxedManyManyQueue<int> queue; diff --git a/library/cpp/threading/equeue/equeue_ut.cpp b/library/cpp/threading/equeue/equeue_ut.cpp index 9cf2aced44..a2072f9a83 100644 --- a/library/cpp/threading/equeue/equeue_ut.cpp +++ b/library/cpp/threading/equeue/equeue_ut.cpp @@ -6,7 +6,7 @@ #include <util/datetime/base.h> #include <util/generic/vector.h> -Y_UNIT_TEST_SUITE(TElasticQueueTest) { +Y_UNIT_TEST_SUITE(TElasticQueueTest) { const size_t MaxQueueSize = 20; const size_t ThreadCount = 10; const size_t N = 100000; @@ -37,7 +37,7 @@ Y_UNIT_TEST_SUITE(TElasticQueueTest) { //fill test -- fill queue with "endless" jobs TSystemEvent WaitEvent; - Y_UNIT_TEST(FillTest) { + Y_UNIT_TEST(FillTest) { Counters.Reset(); struct TWaitJob: public IObjectInQueue { @@ -91,7 +91,7 @@ Y_UNIT_TEST_SUITE(TElasticQueueTest) { static size_t TryCounter; - Y_UNIT_TEST(ConcurrentTest) { + Y_UNIT_TEST(ConcurrentTest) { Counters.Reset(); TryCounter = 0; diff --git a/library/cpp/threading/future/async_ut.cpp b/library/cpp/threading/future/async_ut.cpp index a3699744e4..07f2a66951 100644 --- a/library/cpp/threading/future/async_ut.cpp +++ b/library/cpp/threading/future/async_ut.cpp @@ -27,15 +27,15 @@ namespace NThreading { } -Y_UNIT_TEST_SUITE(Async) { - Y_UNIT_TEST(ExtensionExample) { +Y_UNIT_TEST_SUITE(Async) { + Y_UNIT_TEST(ExtensionExample) { TMySuperTaskQueue queue; auto future = NThreading::Async([]() { return 5; }, queue); future.Wait(); UNIT_ASSERT_VALUES_EQUAL(future.GetValue(), 5); } - Y_UNIT_TEST(WorksWithIMtpQueue) { + Y_UNIT_TEST(WorksWithIMtpQueue) { auto queue = MakeHolder<TThreadPool>(); queue->Start(1); @@ -44,7 +44,7 @@ Y_UNIT_TEST_SUITE(Async) { UNIT_ASSERT_VALUES_EQUAL(future.GetValue(), 5); } - Y_UNIT_TEST(ProperlyDeducesFutureType) { + Y_UNIT_TEST(ProperlyDeducesFutureType) { // Compileability test auto queue = CreateThreadPool(1); diff --git a/library/cpp/threading/future/core/future.h b/library/cpp/threading/future/core/future.h index 2e82bb953e..4ab56efb30 100644 --- a/library/cpp/threading/future/core/future.h +++ b/library/cpp/threading/future/core/future.h @@ -1,7 +1,7 @@ #pragma once -#include "fwd.h" - +#include "fwd.h" + #include <util/datetime/base.h> #include <util/generic/function.h> #include <util/generic/maybe.h> diff --git a/library/cpp/threading/future/future_ut.cpp b/library/cpp/threading/future/future_ut.cpp index 05950a568d..f45fc84e23 100644 --- a/library/cpp/threading/future/future_ut.cpp +++ b/library/cpp/threading/future/future_ut.cpp @@ -64,8 +64,8 @@ namespace { //////////////////////////////////////////////////////////////////////////////// - Y_UNIT_TEST_SUITE(TFutureTest) { - Y_UNIT_TEST(ShouldInitiallyHasNoValue) { + Y_UNIT_TEST_SUITE(TFutureTest) { + Y_UNIT_TEST(ShouldInitiallyHasNoValue) { TPromise<int> promise; UNIT_ASSERT(!promise.HasValue()); @@ -79,7 +79,7 @@ namespace { UNIT_ASSERT(!future.HasValue()); } - Y_UNIT_TEST(ShouldInitiallyHasNoValueVoid) { + Y_UNIT_TEST(ShouldInitiallyHasNoValueVoid) { TPromise<void> promise; UNIT_ASSERT(!promise.HasValue()); @@ -93,7 +93,7 @@ namespace { UNIT_ASSERT(!future.HasValue()); } - Y_UNIT_TEST(ShouldStoreValue) { + Y_UNIT_TEST(ShouldStoreValue) { TPromise<int> promise = NewPromise<int>(); promise.SetValue(123); UNIT_ASSERT(promise.HasValue()); @@ -108,7 +108,7 @@ namespace { UNIT_ASSERT_EQUAL(future.GetValue(), 345); } - Y_UNIT_TEST(ShouldStoreValueVoid) { + Y_UNIT_TEST(ShouldStoreValueVoid) { TPromise<void> promise = NewPromise(); promise.SetValue(); UNIT_ASSERT(promise.HasValue()); @@ -151,7 +151,7 @@ namespace { } }; - Y_UNIT_TEST(ShouldInvokeCallback) { + Y_UNIT_TEST(ShouldInvokeCallback) { TPromise<int> promise = NewPromise<int>(); TTestCallback callback(123); @@ -163,7 +163,7 @@ namespace { UNIT_ASSERT_EQUAL(callback.Value, 123 + 456); } - Y_UNIT_TEST(ShouldApplyFunc) { + Y_UNIT_TEST(ShouldApplyFunc) { TPromise<int> promise = NewPromise<int>(); TTestCallback callback(123); @@ -175,7 +175,7 @@ namespace { UNIT_ASSERT_EQUAL(callback.Value, 123 + 456); } - Y_UNIT_TEST(ShouldApplyVoidFunc) { + Y_UNIT_TEST(ShouldApplyVoidFunc) { TPromise<int> promise = NewPromise<int>(); TTestCallback callback(123); @@ -186,7 +186,7 @@ namespace { UNIT_ASSERT(future.HasValue()); } - Y_UNIT_TEST(ShouldApplyFutureFunc) { + Y_UNIT_TEST(ShouldApplyFutureFunc) { TPromise<int> promise = NewPromise<int>(); TTestCallback callback(123); @@ -198,7 +198,7 @@ namespace { UNIT_ASSERT_EQUAL(callback.Value, 123 + 456); } - Y_UNIT_TEST(ShouldApplyFutureVoidFunc) { + Y_UNIT_TEST(ShouldApplyFutureVoidFunc) { TPromise<int> promise = NewPromise<int>(); TTestCallback callback(123); @@ -212,7 +212,7 @@ namespace { UNIT_ASSERT(future.HasValue()); } - Y_UNIT_TEST(ShouldIgnoreResultIfAsked) { + Y_UNIT_TEST(ShouldIgnoreResultIfAsked) { TPromise<int> promise = NewPromise<int>(); TTestCallback callback(123); @@ -225,7 +225,7 @@ namespace { class TCustomException: public yexception { }; - Y_UNIT_TEST(ShouldRethrowException) { + Y_UNIT_TEST(ShouldRethrowException) { TPromise<int> promise = NewPromise<int>(); try { ythrow TCustomException(); @@ -335,7 +335,7 @@ namespace { UNIT_ASSERT(future.HasValue()); } - Y_UNIT_TEST(ShouldWaitAnyVector) { + Y_UNIT_TEST(ShouldWaitAnyVector) { TPromise<void> promise1 = NewPromise(); TPromise<void> promise2 = NewPromise(); @@ -372,7 +372,7 @@ namespace { UNIT_ASSERT(future.HasValue()); } - Y_UNIT_TEST(ShouldWaitAnyList) { + Y_UNIT_TEST(ShouldWaitAnyList) { TPromise<void> promise1 = NewPromise(); TPromise<void> promise2 = NewPromise(); @@ -390,14 +390,14 @@ namespace { UNIT_ASSERT(future.HasValue()); } - Y_UNIT_TEST(ShouldWaitAnyVectorEmpty) { + Y_UNIT_TEST(ShouldWaitAnyVectorEmpty) { TVector<TFuture<void>> promises; TFuture<void> future = WaitAny(promises); UNIT_ASSERT(future.HasValue()); } - Y_UNIT_TEST(ShouldWaitAny) { + Y_UNIT_TEST(ShouldWaitAny) { TPromise<void> promise1 = NewPromise(); TPromise<void> promise2 = NewPromise(); @@ -411,7 +411,7 @@ namespace { UNIT_ASSERT(future.HasValue()); } - Y_UNIT_TEST(ShouldStoreTypesWithoutDefaultConstructor) { + Y_UNIT_TEST(ShouldStoreTypesWithoutDefaultConstructor) { // compileability test struct TRec { explicit TRec(int) { @@ -426,7 +426,7 @@ namespace { Y_UNUSED(rec); } - Y_UNIT_TEST(ShouldStoreMovableTypes) { + Y_UNIT_TEST(ShouldStoreMovableTypes) { // compileability test struct TRec : TMoveOnly { explicit TRec(int) { @@ -441,7 +441,7 @@ namespace { Y_UNUSED(rec); } - Y_UNIT_TEST(ShouldMoveMovableTypes) { + Y_UNIT_TEST(ShouldMoveMovableTypes) { // compileability test struct TRec : TMoveOnly { explicit TRec(int) { @@ -456,7 +456,7 @@ namespace { Y_UNUSED(rec); } - Y_UNIT_TEST(ShouldNotExtractAfterGet) { + Y_UNIT_TEST(ShouldNotExtractAfterGet) { TPromise<int> promise = NewPromise<int>(); promise.SetValue(123); UNIT_ASSERT(promise.HasValue()); @@ -464,7 +464,7 @@ namespace { UNIT_CHECK_GENERATED_EXCEPTION(promise.ExtractValue(), TFutureException); } - Y_UNIT_TEST(ShouldNotGetAfterExtract) { + Y_UNIT_TEST(ShouldNotGetAfterExtract) { TPromise<int> promise = NewPromise<int>(); promise.SetValue(123); UNIT_ASSERT(promise.HasValue()); @@ -472,7 +472,7 @@ namespace { UNIT_CHECK_GENERATED_EXCEPTION(promise.GetValue(), TFutureException); } - Y_UNIT_TEST(ShouldNotExtractAfterExtract) { + Y_UNIT_TEST(ShouldNotExtractAfterExtract) { TPromise<int> promise = NewPromise<int>(); promise.SetValue(123); UNIT_ASSERT(promise.HasValue()); diff --git a/library/cpp/threading/future/fwd.cpp b/library/cpp/threading/future/fwd.cpp index 4214b6df83..2261ef316c 100644 --- a/library/cpp/threading/future/fwd.cpp +++ b/library/cpp/threading/future/fwd.cpp @@ -1 +1 @@ -#include "fwd.h" +#include "fwd.h" diff --git a/library/cpp/threading/future/fwd.h b/library/cpp/threading/future/fwd.h index 0cd25dd288..b51d9b0019 100644 --- a/library/cpp/threading/future/fwd.h +++ b/library/cpp/threading/future/fwd.h @@ -1,8 +1,8 @@ -#pragma once - +#pragma once + #include "core/fwd.h" -namespace NThreading { - template <typename TR = void, bool IgnoreException = false> - class TLegacyFuture; -} +namespace NThreading { + template <typename TR = void, bool IgnoreException = false> + class TLegacyFuture; +} diff --git a/library/cpp/threading/future/legacy_future.h b/library/cpp/threading/future/legacy_future.h index 6f1eabad73..9bb126e76b 100644 --- a/library/cpp/threading/future/legacy_future.h +++ b/library/cpp/threading/future/legacy_future.h @@ -1,6 +1,6 @@ #pragma once -#include "fwd.h" +#include "fwd.h" #include "future.h" #include <util/thread/factory.h> @@ -8,7 +8,7 @@ #include <functional> namespace NThreading { - template <typename TR, bool IgnoreException> + template <typename TR, bool IgnoreException> class TLegacyFuture: public IThreadFactory::IThreadAble, TNonCopyable { public: typedef TR(TFunctionSignature)(); diff --git a/library/cpp/threading/future/legacy_future_ut.cpp b/library/cpp/threading/future/legacy_future_ut.cpp index ff63db1725..b0c9dc21aa 100644 --- a/library/cpp/threading/future/legacy_future_ut.cpp +++ b/library/cpp/threading/future/legacy_future_ut.cpp @@ -3,12 +3,12 @@ #include <library/cpp/testing/unittest/registar.h> namespace NThreading { - Y_UNIT_TEST_SUITE(TLegacyFutureTest) { + Y_UNIT_TEST_SUITE(TLegacyFutureTest) { int intf() { return 17; } - Y_UNIT_TEST(TestIntFunction) { + Y_UNIT_TEST(TestIntFunction) { TLegacyFuture<int> f((&intf)); UNIT_ASSERT_VALUES_EQUAL(17, f.Get()); } @@ -19,7 +19,7 @@ namespace NThreading { r = 18; } - Y_UNIT_TEST(TestVoidFunction) { + Y_UNIT_TEST(TestVoidFunction) { r = 0; TLegacyFuture<> f((&voidf)); f.Get(); @@ -39,7 +39,7 @@ namespace NThreading { } }; - Y_UNIT_TEST(TestMethod) { + Y_UNIT_TEST(TestMethod) { TLegacyFuture<int> f11(std::bind(&TSampleClass::Calc, TSampleClass(3))); UNIT_ASSERT_VALUES_EQUAL(4, f11.Get()); @@ -57,7 +57,7 @@ namespace NThreading { struct TSomeThreadPool: public IThreadFactory {}; - Y_UNIT_TEST(TestFunction) { + Y_UNIT_TEST(TestFunction) { std::function<int()> f((&intf)); UNIT_ASSERT_VALUES_EQUAL(17, TLegacyFuture<int>(f).Get()); diff --git a/library/cpp/threading/future/wait/wait.h b/library/cpp/threading/future/wait/wait.h index 6ff7d57baa..6497574cec 100644 --- a/library/cpp/threading/future/wait/wait.h +++ b/library/cpp/threading/future/wait/wait.h @@ -1,7 +1,7 @@ #pragma once -#include "fwd.h" - +#include "fwd.h" + #include <library/cpp/threading/future/core/future.h> #include <library/cpp/threading/future/wait/wait_group.h> diff --git a/library/cpp/threading/future/ya.make b/library/cpp/threading/future/ya.make index 6591031f46..d3ad13fa8e 100644 --- a/library/cpp/threading/future/ya.make +++ b/library/cpp/threading/future/ya.make @@ -1,14 +1,14 @@ -OWNER( - g:rtmr -) - +OWNER( + g:rtmr +) + LIBRARY() SRCS( async.cpp core/future.cpp core/fwd.cpp - fwd.cpp + fwd.cpp wait/fwd.cpp wait/wait.cpp wait/wait_group.cpp diff --git a/library/cpp/threading/local_executor/local_executor.cpp b/library/cpp/threading/local_executor/local_executor.cpp index 1d3fbb4bf4..6e62d09d85 100644 --- a/library/cpp/threading/local_executor/local_executor.cpp +++ b/library/cpp/threading/local_executor/local_executor.cpp @@ -1,17 +1,17 @@ #include "local_executor.h" #include <library/cpp/threading/future/future.h> - -#include <util/generic/utility.h> -#include <util/system/atomic.h> -#include <util/system/event.h> + +#include <util/generic/utility.h> +#include <util/system/atomic.h> +#include <util/system/event.h> #include <util/system/thread.h> -#include <util/system/tls.h> +#include <util/system/tls.h> #include <util/system/yield.h> -#include <util/thread/lfqueue.h> - -#include <utility> +#include <util/thread/lfqueue.h> +#include <utility> + #ifdef _win_ static void RegularYield() { } @@ -23,11 +23,11 @@ static void RegularYield() { } #endif -namespace { - struct TFunctionWrapper : NPar::ILocallyExecutable { - NPar::TLocallyExecutableFunction Exec; - TFunctionWrapper(NPar::TLocallyExecutableFunction exec) - : Exec(std::move(exec)) +namespace { + struct TFunctionWrapper : NPar::ILocallyExecutable { + NPar::TLocallyExecutableFunction Exec; + TFunctionWrapper(NPar::TLocallyExecutableFunction exec) + : Exec(std::move(exec)) { } void LocalExec(int id) override { @@ -35,15 +35,15 @@ namespace { } }; - class TFunctionWrapperWithPromise: public NPar::ILocallyExecutable { + class TFunctionWrapperWithPromise: public NPar::ILocallyExecutable { private: - NPar::TLocallyExecutableFunction Exec; + NPar::TLocallyExecutableFunction Exec; int FirstId, LastId; TVector<NThreading::TPromise<void>> Promises; public: - TFunctionWrapperWithPromise(NPar::TLocallyExecutableFunction exec, int firstId, int lastId) - : Exec(std::move(exec)) + TFunctionWrapperWithPromise(NPar::TLocallyExecutableFunction exec, int firstId, int lastId) + : Exec(std::move(exec)) , FirstId(firstId) , LastId(lastId) { @@ -70,300 +70,300 @@ namespace { } }; - struct TSingleJob { - TIntrusivePtr<NPar::ILocallyExecutable> Exec; - int Id{0}; + struct TSingleJob { + TIntrusivePtr<NPar::ILocallyExecutable> Exec; + int Id{0}; - TSingleJob() = default; - TSingleJob(TIntrusivePtr<NPar::ILocallyExecutable> exec, int id) - : Exec(std::move(exec)) - , Id(id) - { + TSingleJob() = default; + TSingleJob(TIntrusivePtr<NPar::ILocallyExecutable> exec, int id) + : Exec(std::move(exec)) + , Id(id) + { } - }; + }; - class TLocalRangeExecutor: public NPar::ILocallyExecutable { - TIntrusivePtr<NPar::ILocallyExecutable> Exec; + class TLocalRangeExecutor: public NPar::ILocallyExecutable { + TIntrusivePtr<NPar::ILocallyExecutable> Exec; alignas(64) TAtomic Counter; alignas(64) TAtomic WorkerCount; - int LastId; - - void LocalExec(int) override { - AtomicAdd(WorkerCount, 1); - for (;;) { - if (!DoSingleOp()) - break; - } - AtomicAdd(WorkerCount, -1); + int LastId; + + void LocalExec(int) override { + AtomicAdd(WorkerCount, 1); + for (;;) { + if (!DoSingleOp()) + break; + } + AtomicAdd(WorkerCount, -1); } - public: - TLocalRangeExecutor(TIntrusivePtr<ILocallyExecutable> exec, int firstId, int lastId) - : Exec(std::move(exec)) - , Counter(firstId) - , WorkerCount(0) - , LastId(lastId) - { + public: + TLocalRangeExecutor(TIntrusivePtr<ILocallyExecutable> exec, int firstId, int lastId) + : Exec(std::move(exec)) + , Counter(firstId) + , WorkerCount(0) + , LastId(lastId) + { } - bool DoSingleOp() { + bool DoSingleOp() { const int id = AtomicAdd(Counter, 1) - 1; - if (id >= LastId) - return false; - Exec->LocalExec(id); - RegularYield(); - return true; + if (id >= LastId) + return false; + Exec->LocalExec(id); + RegularYield(); + return true; } - void WaitComplete() { - while (AtomicGet(WorkerCount) > 0) - RegularYield(); + void WaitComplete() { + while (AtomicGet(WorkerCount) > 0) + RegularYield(); } - int GetRangeSize() const { - return Max<int>(LastId - Counter, 0); + int GetRangeSize() const { + return Max<int>(LastId - Counter, 0); } - }; + }; } - -////////////////////////////////////////////////////////////////////////// -class NPar::TLocalExecutor::TImpl { -public: - TLockFreeQueue<TSingleJob> JobQueue; - TLockFreeQueue<TSingleJob> MedJobQueue; - TLockFreeQueue<TSingleJob> LowJobQueue; + +////////////////////////////////////////////////////////////////////////// +class NPar::TLocalExecutor::TImpl { +public: + TLockFreeQueue<TSingleJob> JobQueue; + TLockFreeQueue<TSingleJob> MedJobQueue; + TLockFreeQueue<TSingleJob> LowJobQueue; alignas(64) TSystemEvent HasJob; - - TAtomic ThreadCount{0}; + + TAtomic ThreadCount{0}; alignas(64) TAtomic QueueSize{0}; - TAtomic MPQueueSize{0}; - TAtomic LPQueueSize{0}; - TAtomic ThreadId{0}; - - Y_THREAD(int) - CurrentTaskPriority; - Y_THREAD(int) - WorkerThreadId; - - static void* HostWorkerThread(void* p); - bool GetJob(TSingleJob* job); - void RunNewThread(); - void LaunchRange(TIntrusivePtr<TLocalRangeExecutor> execRange, int queueSizeLimit, - TAtomic* queueSize, TLockFreeQueue<TSingleJob>* jobQueue); - - TImpl() = default; - ~TImpl(); -}; - -NPar::TLocalExecutor::TImpl::~TImpl() { - AtomicAdd(QueueSize, 1); - JobQueue.Enqueue(TSingleJob(nullptr, 0)); - HasJob.Signal(); - while (AtomicGet(ThreadCount)) { - ThreadYield(); - } -} - -void* NPar::TLocalExecutor::TImpl::HostWorkerThread(void* p) { - static const int FAST_ITERATIONS = 200; - - auto* const ctx = (TImpl*)p; + TAtomic MPQueueSize{0}; + TAtomic LPQueueSize{0}; + TAtomic ThreadId{0}; + + Y_THREAD(int) + CurrentTaskPriority; + Y_THREAD(int) + WorkerThreadId; + + static void* HostWorkerThread(void* p); + bool GetJob(TSingleJob* job); + void RunNewThread(); + void LaunchRange(TIntrusivePtr<TLocalRangeExecutor> execRange, int queueSizeLimit, + TAtomic* queueSize, TLockFreeQueue<TSingleJob>* jobQueue); + + TImpl() = default; + ~TImpl(); +}; + +NPar::TLocalExecutor::TImpl::~TImpl() { + AtomicAdd(QueueSize, 1); + JobQueue.Enqueue(TSingleJob(nullptr, 0)); + HasJob.Signal(); + while (AtomicGet(ThreadCount)) { + ThreadYield(); + } +} + +void* NPar::TLocalExecutor::TImpl::HostWorkerThread(void* p) { + static const int FAST_ITERATIONS = 200; + + auto* const ctx = (TImpl*)p; TThread::SetCurrentThreadName("ParLocalExecutor"); - ctx->WorkerThreadId = AtomicAdd(ctx->ThreadId, 1); - for (bool cont = true; cont;) { - TSingleJob job; - bool gotJob = false; - for (int iter = 0; iter < FAST_ITERATIONS; ++iter) { - if (ctx->GetJob(&job)) { - gotJob = true; - break; - } - } - if (!gotJob) { - ctx->HasJob.Reset(); - if (!ctx->GetJob(&job)) { - ctx->HasJob.Wait(); - continue; - } - } - if (job.Exec.Get()) { - job.Exec->LocalExec(job.Id); - RegularYield(); - } else { - AtomicAdd(ctx->QueueSize, 1); - ctx->JobQueue.Enqueue(job); - ctx->HasJob.Signal(); - cont = false; - } - } - AtomicAdd(ctx->ThreadCount, -1); - return nullptr; -} - -bool NPar::TLocalExecutor::TImpl::GetJob(TSingleJob* job) { - if (JobQueue.Dequeue(job)) { - CurrentTaskPriority = TLocalExecutor::HIGH_PRIORITY; - AtomicAdd(QueueSize, -1); - return true; - } else if (MedJobQueue.Dequeue(job)) { - CurrentTaskPriority = TLocalExecutor::MED_PRIORITY; - AtomicAdd(MPQueueSize, -1); - return true; - } else if (LowJobQueue.Dequeue(job)) { - CurrentTaskPriority = TLocalExecutor::LOW_PRIORITY; - AtomicAdd(LPQueueSize, -1); - return true; - } - return false; -} - -void NPar::TLocalExecutor::TImpl::RunNewThread() { - AtomicAdd(ThreadCount, 1); - TThread thr(HostWorkerThread, this); - thr.Start(); - thr.Detach(); -} - -void NPar::TLocalExecutor::TImpl::LaunchRange(TIntrusivePtr<TLocalRangeExecutor> rangeExec, - int queueSizeLimit, - TAtomic* queueSize, - TLockFreeQueue<TSingleJob>* jobQueue) { - int count = Min<int>(ThreadCount + 1, rangeExec->GetRangeSize()); - if (queueSizeLimit >= 0 && AtomicGet(*queueSize) >= queueSizeLimit) { - return; - } - AtomicAdd(*queueSize, count); + ctx->WorkerThreadId = AtomicAdd(ctx->ThreadId, 1); + for (bool cont = true; cont;) { + TSingleJob job; + bool gotJob = false; + for (int iter = 0; iter < FAST_ITERATIONS; ++iter) { + if (ctx->GetJob(&job)) { + gotJob = true; + break; + } + } + if (!gotJob) { + ctx->HasJob.Reset(); + if (!ctx->GetJob(&job)) { + ctx->HasJob.Wait(); + continue; + } + } + if (job.Exec.Get()) { + job.Exec->LocalExec(job.Id); + RegularYield(); + } else { + AtomicAdd(ctx->QueueSize, 1); + ctx->JobQueue.Enqueue(job); + ctx->HasJob.Signal(); + cont = false; + } + } + AtomicAdd(ctx->ThreadCount, -1); + return nullptr; +} + +bool NPar::TLocalExecutor::TImpl::GetJob(TSingleJob* job) { + if (JobQueue.Dequeue(job)) { + CurrentTaskPriority = TLocalExecutor::HIGH_PRIORITY; + AtomicAdd(QueueSize, -1); + return true; + } else if (MedJobQueue.Dequeue(job)) { + CurrentTaskPriority = TLocalExecutor::MED_PRIORITY; + AtomicAdd(MPQueueSize, -1); + return true; + } else if (LowJobQueue.Dequeue(job)) { + CurrentTaskPriority = TLocalExecutor::LOW_PRIORITY; + AtomicAdd(LPQueueSize, -1); + return true; + } + return false; +} + +void NPar::TLocalExecutor::TImpl::RunNewThread() { + AtomicAdd(ThreadCount, 1); + TThread thr(HostWorkerThread, this); + thr.Start(); + thr.Detach(); +} + +void NPar::TLocalExecutor::TImpl::LaunchRange(TIntrusivePtr<TLocalRangeExecutor> rangeExec, + int queueSizeLimit, + TAtomic* queueSize, + TLockFreeQueue<TSingleJob>* jobQueue) { + int count = Min<int>(ThreadCount + 1, rangeExec->GetRangeSize()); + if (queueSizeLimit >= 0 && AtomicGet(*queueSize) >= queueSizeLimit) { + return; + } + AtomicAdd(*queueSize, count); jobQueue->EnqueueAll(TVector<TSingleJob>{size_t(count), TSingleJob(rangeExec, 0)}); - HasJob.Signal(); -} - -NPar::TLocalExecutor::TLocalExecutor() - : Impl_{MakeHolder<TImpl>()} { -} - -NPar::TLocalExecutor::~TLocalExecutor() = default; - -void NPar::TLocalExecutor::RunAdditionalThreads(int threadCount) { - for (int i = 0; i < threadCount; i++) - Impl_->RunNewThread(); -} - -void NPar::TLocalExecutor::Exec(TIntrusivePtr<ILocallyExecutable> exec, int id, int flags) { - Y_ASSERT((flags & WAIT_COMPLETE) == 0); // unsupported - int prior = Max<int>(Impl_->CurrentTaskPriority, flags & PRIORITY_MASK); - switch (prior) { - case HIGH_PRIORITY: - AtomicAdd(Impl_->QueueSize, 1); - Impl_->JobQueue.Enqueue(TSingleJob(std::move(exec), id)); - break; - case MED_PRIORITY: - AtomicAdd(Impl_->MPQueueSize, 1); - Impl_->MedJobQueue.Enqueue(TSingleJob(std::move(exec), id)); - break; - case LOW_PRIORITY: - AtomicAdd(Impl_->LPQueueSize, 1); - Impl_->LowJobQueue.Enqueue(TSingleJob(std::move(exec), id)); - break; - default: - Y_ASSERT(0); - break; - } - Impl_->HasJob.Signal(); -} - + HasJob.Signal(); +} + +NPar::TLocalExecutor::TLocalExecutor() + : Impl_{MakeHolder<TImpl>()} { +} + +NPar::TLocalExecutor::~TLocalExecutor() = default; + +void NPar::TLocalExecutor::RunAdditionalThreads(int threadCount) { + for (int i = 0; i < threadCount; i++) + Impl_->RunNewThread(); +} + +void NPar::TLocalExecutor::Exec(TIntrusivePtr<ILocallyExecutable> exec, int id, int flags) { + Y_ASSERT((flags & WAIT_COMPLETE) == 0); // unsupported + int prior = Max<int>(Impl_->CurrentTaskPriority, flags & PRIORITY_MASK); + switch (prior) { + case HIGH_PRIORITY: + AtomicAdd(Impl_->QueueSize, 1); + Impl_->JobQueue.Enqueue(TSingleJob(std::move(exec), id)); + break; + case MED_PRIORITY: + AtomicAdd(Impl_->MPQueueSize, 1); + Impl_->MedJobQueue.Enqueue(TSingleJob(std::move(exec), id)); + break; + case LOW_PRIORITY: + AtomicAdd(Impl_->LPQueueSize, 1); + Impl_->LowJobQueue.Enqueue(TSingleJob(std::move(exec), id)); + break; + default: + Y_ASSERT(0); + break; + } + Impl_->HasJob.Signal(); +} + void NPar::ILocalExecutor::Exec(TLocallyExecutableFunction exec, int id, int flags) { - Exec(new TFunctionWrapper(std::move(exec)), id, flags); -} - -void NPar::TLocalExecutor::ExecRange(TIntrusivePtr<ILocallyExecutable> exec, int firstId, int lastId, int flags) { - Y_ASSERT(lastId >= firstId); + Exec(new TFunctionWrapper(std::move(exec)), id, flags); +} + +void NPar::TLocalExecutor::ExecRange(TIntrusivePtr<ILocallyExecutable> exec, int firstId, int lastId, int flags) { + Y_ASSERT(lastId >= firstId); if (TryExecRangeSequentially([=] (int id) { exec->LocalExec(id); }, firstId, lastId, flags)) { - return; - } - auto rangeExec = MakeIntrusive<TLocalRangeExecutor>(std::move(exec), firstId, lastId); - int queueSizeLimit = (flags & WAIT_COMPLETE) ? 10000 : -1; - int prior = Max<int>(Impl_->CurrentTaskPriority, flags & PRIORITY_MASK); - switch (prior) { - case HIGH_PRIORITY: - Impl_->LaunchRange(rangeExec, queueSizeLimit, &Impl_->QueueSize, &Impl_->JobQueue); - break; - case MED_PRIORITY: - Impl_->LaunchRange(rangeExec, queueSizeLimit, &Impl_->MPQueueSize, &Impl_->MedJobQueue); - break; - case LOW_PRIORITY: - Impl_->LaunchRange(rangeExec, queueSizeLimit, &Impl_->LPQueueSize, &Impl_->LowJobQueue); - break; - default: - Y_ASSERT(0); - break; - } - if (flags & WAIT_COMPLETE) { - int keepPrior = Impl_->CurrentTaskPriority; - Impl_->CurrentTaskPriority = prior; - while (rangeExec->DoSingleOp()) { - } - Impl_->CurrentTaskPriority = keepPrior; - rangeExec->WaitComplete(); - } -} - + return; + } + auto rangeExec = MakeIntrusive<TLocalRangeExecutor>(std::move(exec), firstId, lastId); + int queueSizeLimit = (flags & WAIT_COMPLETE) ? 10000 : -1; + int prior = Max<int>(Impl_->CurrentTaskPriority, flags & PRIORITY_MASK); + switch (prior) { + case HIGH_PRIORITY: + Impl_->LaunchRange(rangeExec, queueSizeLimit, &Impl_->QueueSize, &Impl_->JobQueue); + break; + case MED_PRIORITY: + Impl_->LaunchRange(rangeExec, queueSizeLimit, &Impl_->MPQueueSize, &Impl_->MedJobQueue); + break; + case LOW_PRIORITY: + Impl_->LaunchRange(rangeExec, queueSizeLimit, &Impl_->LPQueueSize, &Impl_->LowJobQueue); + break; + default: + Y_ASSERT(0); + break; + } + if (flags & WAIT_COMPLETE) { + int keepPrior = Impl_->CurrentTaskPriority; + Impl_->CurrentTaskPriority = prior; + while (rangeExec->DoSingleOp()) { + } + Impl_->CurrentTaskPriority = keepPrior; + rangeExec->WaitComplete(); + } +} + void NPar::ILocalExecutor::ExecRange(TLocallyExecutableFunction exec, int firstId, int lastId, int flags) { if (TryExecRangeSequentially(exec, firstId, lastId, flags)) { return; } - ExecRange(new TFunctionWrapper(exec), firstId, lastId, flags); -} - + ExecRange(new TFunctionWrapper(exec), firstId, lastId, flags); +} + void NPar::ILocalExecutor::ExecRangeWithThrow(TLocallyExecutableFunction exec, int firstId, int lastId, int flags) { - Y_VERIFY((flags & WAIT_COMPLETE) != 0, "ExecRangeWithThrow() requires WAIT_COMPLETE to wait if exceptions arise."); + Y_VERIFY((flags & WAIT_COMPLETE) != 0, "ExecRangeWithThrow() requires WAIT_COMPLETE to wait if exceptions arise."); if (TryExecRangeSequentially(exec, firstId, lastId, flags)) { return; } - TVector<NThreading::TFuture<void>> currentRun = ExecRangeWithFutures(exec, firstId, lastId, flags); - for (auto& result : currentRun) { - result.GetValueSync(); // Exception will be rethrown if exists. If several exception - only the one with minimal id is rethrown. - } -} - -TVector<NThreading::TFuture<void>> + TVector<NThreading::TFuture<void>> currentRun = ExecRangeWithFutures(exec, firstId, lastId, flags); + for (auto& result : currentRun) { + result.GetValueSync(); // Exception will be rethrown if exists. If several exception - only the one with minimal id is rethrown. + } +} + +TVector<NThreading::TFuture<void>> NPar::ILocalExecutor::ExecRangeWithFutures(TLocallyExecutableFunction exec, int firstId, int lastId, int flags) { - TFunctionWrapperWithPromise* execWrapper = new TFunctionWrapperWithPromise(exec, firstId, lastId); - TVector<NThreading::TFuture<void>> out = execWrapper->GetFutures(); - ExecRange(execWrapper, firstId, lastId, flags); - return out; -} - -void NPar::TLocalExecutor::ClearLPQueue() { - for (bool cont = true; cont;) { - cont = false; - TSingleJob job; - while (Impl_->LowJobQueue.Dequeue(&job)) { - AtomicAdd(Impl_->LPQueueSize, -1); - cont = true; - } - while (Impl_->MedJobQueue.Dequeue(&job)) { - AtomicAdd(Impl_->MPQueueSize, -1); - cont = true; - } - } -} - -int NPar::TLocalExecutor::GetQueueSize() const noexcept { - return AtomicGet(Impl_->QueueSize); -} - -int NPar::TLocalExecutor::GetMPQueueSize() const noexcept { - return AtomicGet(Impl_->MPQueueSize); -} - -int NPar::TLocalExecutor::GetLPQueueSize() const noexcept { - return AtomicGet(Impl_->LPQueueSize); -} - + TFunctionWrapperWithPromise* execWrapper = new TFunctionWrapperWithPromise(exec, firstId, lastId); + TVector<NThreading::TFuture<void>> out = execWrapper->GetFutures(); + ExecRange(execWrapper, firstId, lastId, flags); + return out; +} + +void NPar::TLocalExecutor::ClearLPQueue() { + for (bool cont = true; cont;) { + cont = false; + TSingleJob job; + while (Impl_->LowJobQueue.Dequeue(&job)) { + AtomicAdd(Impl_->LPQueueSize, -1); + cont = true; + } + while (Impl_->MedJobQueue.Dequeue(&job)) { + AtomicAdd(Impl_->MPQueueSize, -1); + cont = true; + } + } +} + +int NPar::TLocalExecutor::GetQueueSize() const noexcept { + return AtomicGet(Impl_->QueueSize); +} + +int NPar::TLocalExecutor::GetMPQueueSize() const noexcept { + return AtomicGet(Impl_->MPQueueSize); +} + +int NPar::TLocalExecutor::GetLPQueueSize() const noexcept { + return AtomicGet(Impl_->LPQueueSize); +} + int NPar::TLocalExecutor::GetWorkerThreadId() const noexcept { - return Impl_->WorkerThreadId; -} - -int NPar::TLocalExecutor::GetThreadCount() const noexcept { - return AtomicGet(Impl_->ThreadCount); -} - -////////////////////////////////////////////////////////////////////////// + return Impl_->WorkerThreadId; +} + +int NPar::TLocalExecutor::GetThreadCount() const noexcept { + return AtomicGet(Impl_->ThreadCount); +} + +////////////////////////////////////////////////////////////////////////// diff --git a/library/cpp/threading/local_executor/local_executor.h b/library/cpp/threading/local_executor/local_executor.h index c1c824f67c..aa500d34d3 100644 --- a/library/cpp/threading/local_executor/local_executor.h +++ b/library/cpp/threading/local_executor/local_executor.h @@ -1,23 +1,23 @@ #pragma once #include <library/cpp/threading/future/future.h> - + #include <util/generic/cast.h> -#include <util/generic/fwd.h> -#include <util/generic/noncopyable.h> +#include <util/generic/fwd.h> +#include <util/generic/noncopyable.h> #include <util/generic/ptr.h> -#include <util/generic/singleton.h> +#include <util/generic/singleton.h> #include <util/generic/ymath.h> - + #include <functional> namespace NPar { struct ILocallyExecutable : virtual public TThrRefBase { - // Must be implemented by the end user to define job that will be processed by one of - // executor threads. - // - // @param id Job parameter, typically an index pointing somewhere in array, or just - // some dummy value, e.g. `0`. + // Must be implemented by the end user to define job that will be processed by one of + // executor threads. + // + // @param id Job parameter, typically an index pointing somewhere in array, or just + // some dummy value, e.g. `0`. virtual void LocalExec(int id) = 0; }; @@ -31,7 +31,7 @@ namespace NPar { ILocalExecutor() = default; virtual ~ILocalExecutor() = default; - enum EFlags : int { + enum EFlags : int { HIGH_PRIORITY = 0, MED_PRIORITY = 1, LOW_PRIORITY = 2, @@ -58,8 +58,8 @@ namespace NPar { virtual int GetWorkerThreadId() const noexcept = 0; virtual int GetThreadCount() const noexcept = 0; - // Describes a range of tasks with parameters from integer range [FirstId, LastId). - // + // Describes a range of tasks with parameters from integer range [FirstId, LastId). + // class TExecRangeParams { public: template <typename TFirst, typename TLast> @@ -70,9 +70,9 @@ namespace NPar { Y_ASSERT(LastId >= FirstId); SetBlockSize(1); } - // Partition tasks into `blockCount` blocks of approximately equal size, each of which - // will be executed as a separate bigger task. - // + // Partition tasks into `blockCount` blocks of approximately equal size, each of which + // will be executed as a separate bigger task. + // template <typename TBlockCount> TExecRangeParams& SetBlockCount(TBlockCount blockCount) { Y_ASSERT(SafeIntegerCast<int>(blockCount) > 0 || FirstId == LastId); @@ -81,9 +81,9 @@ namespace NPar { BlockEqualToThreads = false; return *this; } - // Partition tasks into blocks of approximately `blockSize` size, each of which will - // be executed as a separate bigger task. - // + // Partition tasks into blocks of approximately `blockSize` size, each of which will + // be executed as a separate bigger task. + // template <typename TBlockSize> TExecRangeParams& SetBlockSize(TBlockSize blockSize) { Y_ASSERT(SafeIntegerCast<int>(blockSize) > 0 || FirstId == LastId); @@ -92,9 +92,9 @@ namespace NPar { BlockEqualToThreads = false; return *this; } - // Partition tasks into thread count blocks of approximately equal size, each of which - // will be executed as a separate bigger task. - // + // Partition tasks into thread count blocks of approximately equal size, each of which + // will be executed as a separate bigger task. + // TExecRangeParams& SetBlockCountToThreadCount() { BlockEqualToThreads = true; return *this; @@ -107,9 +107,9 @@ namespace NPar { Y_ASSERT(!BlockEqualToThreads); return BlockSize; } - bool GetBlockEqualToThreads() { - return BlockEqualToThreads; - } + bool GetBlockEqualToThreads() { + return BlockEqualToThreads; + } const int FirstId = 0; const int LastId = 0; @@ -120,26 +120,26 @@ namespace NPar { bool BlockEqualToThreads; }; - // `Exec` and `ExecRange` versions that accept functions. - // - void Exec(TLocallyExecutableFunction exec, int id, int flags); - void ExecRange(TLocallyExecutableFunction exec, int firstId, int lastId, int flags); - - // Version of `ExecRange` that throws exception from task with minimal id if at least one of - // task threw an exception. - // - void ExecRangeWithThrow(TLocallyExecutableFunction exec, int firstId, int lastId, int flags); - - // Version of `ExecRange` that returns vector of futures, thus allowing to retry any task if - // it fails. - // - TVector<NThreading::TFuture<void>> ExecRangeWithFutures(TLocallyExecutableFunction exec, int firstId, int lastId, int flags); - + // `Exec` and `ExecRange` versions that accept functions. + // + void Exec(TLocallyExecutableFunction exec, int id, int flags); + void ExecRange(TLocallyExecutableFunction exec, int firstId, int lastId, int flags); + + // Version of `ExecRange` that throws exception from task with minimal id if at least one of + // task threw an exception. + // + void ExecRangeWithThrow(TLocallyExecutableFunction exec, int firstId, int lastId, int flags); + + // Version of `ExecRange` that returns vector of futures, thus allowing to retry any task if + // it fails. + // + TVector<NThreading::TFuture<void>> ExecRangeWithFutures(TLocallyExecutableFunction exec, int firstId, int lastId, int flags); + template <typename TBody> static inline auto BlockedLoopBody(const TExecRangeParams& params, const TBody& body) { return [=](int blockId) { - const int blockFirstId = params.FirstId + blockId * params.GetBlockSize(); - const int blockLastId = Min(params.LastId, blockFirstId + params.GetBlockSize()); + const int blockFirstId = params.FirstId + blockId * params.GetBlockSize(); + const int blockLastId = Min(params.LastId, blockFirstId + params.GetBlockSize()); for (int i = blockFirstId; i < blockLastId; ++i) { body(i); } @@ -151,10 +151,10 @@ namespace NPar { if (TryExecRangeSequentially(body, params.FirstId, params.LastId, flags)) { return; } - if (params.GetBlockEqualToThreads()) { - params.SetBlockCount(GetThreadCount() + ((flags & WAIT_COMPLETE) != 0)); // ThreadCount or ThreadCount+1 depending on WaitFlag + if (params.GetBlockEqualToThreads()) { + params.SetBlockCount(GetThreadCount() + ((flags & WAIT_COMPLETE) != 0)); // ThreadCount or ThreadCount+1 depending on WaitFlag } - ExecRange(BlockedLoopBody(params, body), 0, params.GetBlockCount(), flags); + ExecRange(BlockedLoopBody(params, body), 0, params.GetBlockCount(), flags); } template <typename TBody> @@ -269,7 +269,7 @@ namespace NPar { THolder<TImpl> Impl_; }; - static inline TLocalExecutor& LocalExecutor() { + static inline TLocalExecutor& LocalExecutor() { return *Singleton<TLocalExecutor>(); } diff --git a/library/cpp/threading/local_executor/ut/local_executor_ut.cpp b/library/cpp/threading/local_executor/ut/local_executor_ut.cpp index ac5737717c..fe7dab0899 100644 --- a/library/cpp/threading/local_executor/ut/local_executor_ut.cpp +++ b/library/cpp/threading/local_executor/ut/local_executor_ut.cpp @@ -1,10 +1,10 @@ #include <library/cpp/threading/local_executor/local_executor.h> #include <library/cpp/threading/future/future.h> - + #include <library/cpp/testing/unittest/registar.h> #include <util/system/mutex.h> #include <util/system/rwlock.h> -#include <util/generic/algorithm.h> +#include <util/generic/algorithm.h> using namespace NPar; @@ -14,7 +14,7 @@ class TTestException: public yexception { static const int DefaultThreadsCount = 41; static const int DefaultRangeSize = 999; -Y_UNIT_TEST_SUITE(ExecRangeWithFutures){ +Y_UNIT_TEST_SUITE(ExecRangeWithFutures){ bool AllOf(const TVector<int>& vec, int value){ return AllOf(vec, [value](int element) { return value == element; }); } @@ -41,23 +41,23 @@ void AsyncRunAndWaitFuturesReady(int rangeSize, int threads) { UNIT_ASSERT(AllOf(data, 1)); } -Y_UNIT_TEST(AsyncRunRangeAndWaitFuturesReady) { +Y_UNIT_TEST(AsyncRunRangeAndWaitFuturesReady) { AsyncRunAndWaitFuturesReady(DefaultRangeSize, DefaultThreadsCount); } -Y_UNIT_TEST(AsyncRunOneTaskAndWaitFuturesReady) { +Y_UNIT_TEST(AsyncRunOneTaskAndWaitFuturesReady) { AsyncRunAndWaitFuturesReady(1, DefaultThreadsCount); } -Y_UNIT_TEST(AsyncRunRangeAndWaitFuturesReadyOneExtraThread) { +Y_UNIT_TEST(AsyncRunRangeAndWaitFuturesReadyOneExtraThread) { AsyncRunAndWaitFuturesReady(DefaultRangeSize, 1); } -Y_UNIT_TEST(AsyncRunOneThreadAndWaitFuturesReadyOneExtraThread) { +Y_UNIT_TEST(AsyncRunOneThreadAndWaitFuturesReadyOneExtraThread) { AsyncRunAndWaitFuturesReady(1, 1); } -Y_UNIT_TEST(AsyncRunTwoRangesAndWaitFuturesReady) { +Y_UNIT_TEST(AsyncRunTwoRangesAndWaitFuturesReady) { TLocalExecutor localExecutor; localExecutor.RunAdditionalThreads(DefaultThreadsCount); TAtomic signal = 0; @@ -118,23 +118,23 @@ void AsyncRunRangeAndWaitExceptions(int rangeSize, int threadsCount) { UNIT_ASSERT(AllOf(data, 1)); } -Y_UNIT_TEST(AsyncRunRangeAndWaitExceptions) { +Y_UNIT_TEST(AsyncRunRangeAndWaitExceptions) { AsyncRunRangeAndWaitExceptions(DefaultRangeSize, DefaultThreadsCount); } -Y_UNIT_TEST(AsyncRunOneTaskAndWaitExceptions) { +Y_UNIT_TEST(AsyncRunOneTaskAndWaitExceptions) { AsyncRunRangeAndWaitExceptions(1, DefaultThreadsCount); } -Y_UNIT_TEST(AsyncRunRangeAndWaitExceptionsOneExtraThread) { +Y_UNIT_TEST(AsyncRunRangeAndWaitExceptionsOneExtraThread) { AsyncRunRangeAndWaitExceptions(DefaultRangeSize, 1); } -Y_UNIT_TEST(AsyncRunOneTaskAndWaitExceptionsOneExtraThread) { +Y_UNIT_TEST(AsyncRunOneTaskAndWaitExceptionsOneExtraThread) { AsyncRunRangeAndWaitExceptions(1, 1); } -Y_UNIT_TEST(AsyncRunTwoRangesAndWaitExceptions) { +Y_UNIT_TEST(AsyncRunTwoRangesAndWaitExceptions) { TLocalExecutor localExecutor; localExecutor.RunAdditionalThreads(DefaultThreadsCount); TAtomic signal = 0; @@ -209,33 +209,33 @@ void RunRangeAndCheckExceptionsWithWaitComplete(int rangeSize, int threadsCount) UNIT_ASSERT(AllOf(data, 1)); } -Y_UNIT_TEST(RunRangeAndCheckExceptionsWithWaitComplete) { +Y_UNIT_TEST(RunRangeAndCheckExceptionsWithWaitComplete) { RunRangeAndCheckExceptionsWithWaitComplete(DefaultRangeSize, DefaultThreadsCount); } -Y_UNIT_TEST(RunOneAndCheckExceptionsWithWaitComplete) { +Y_UNIT_TEST(RunOneAndCheckExceptionsWithWaitComplete) { RunRangeAndCheckExceptionsWithWaitComplete(1, DefaultThreadsCount); } -Y_UNIT_TEST(RunRangeAndCheckExceptionsWithWaitCompleteOneExtraThread) { +Y_UNIT_TEST(RunRangeAndCheckExceptionsWithWaitCompleteOneExtraThread) { RunRangeAndCheckExceptionsWithWaitComplete(DefaultRangeSize, 1); } -Y_UNIT_TEST(RunOneAndCheckExceptionsWithWaitCompleteOneExtraThread) { +Y_UNIT_TEST(RunOneAndCheckExceptionsWithWaitCompleteOneExtraThread) { RunRangeAndCheckExceptionsWithWaitComplete(1, 1); } -Y_UNIT_TEST(RunRangeAndCheckExceptionsWithWaitCompleteZeroExtraThreads) { +Y_UNIT_TEST(RunRangeAndCheckExceptionsWithWaitCompleteZeroExtraThreads) { RunRangeAndCheckExceptionsWithWaitComplete(DefaultRangeSize, 0); } -Y_UNIT_TEST(RunOneAndCheckExceptionsWithWaitCompleteZeroExtraThreads) { +Y_UNIT_TEST(RunOneAndCheckExceptionsWithWaitCompleteZeroExtraThreads) { RunRangeAndCheckExceptionsWithWaitComplete(1, 0); } } ; -Y_UNIT_TEST_SUITE(ExecRangeWithThrow){ +Y_UNIT_TEST_SUITE(ExecRangeWithThrow){ void RunParallelWhichThrowsTTestException(int rangeStart, int rangeSize, int threadsCount, int flags, TAtomic& processed){ AtomicSet(processed, 0); TLocalExecutor localExecutor; @@ -247,7 +247,7 @@ localExecutor.ExecRangeWithThrow([&processed](int) { rangeStart, rangeStart + rangeSize, flags); } -Y_UNIT_TEST(RunParallelWhichThrowsTTestException) { +Y_UNIT_TEST(RunParallelWhichThrowsTTestException) { TAtomic processed = 0; UNIT_ASSERT_EXCEPTION( RunParallelWhichThrowsTTestException(10, 40, DefaultThreadsCount, @@ -264,32 +264,32 @@ void ThrowAndCatchTTestException(int rangeSize, int threadsCount, int flags) { UNIT_ASSERT(AtomicGet(processed) == rangeSize); } -Y_UNIT_TEST(ThrowAndCatchTTestExceptionLowPriority) { +Y_UNIT_TEST(ThrowAndCatchTTestExceptionLowPriority) { ThrowAndCatchTTestException(DefaultRangeSize, DefaultThreadsCount, TLocalExecutor::EFlags::WAIT_COMPLETE | TLocalExecutor::EFlags::LOW_PRIORITY); } -Y_UNIT_TEST(ThrowAndCatchTTestExceptionMedPriority) { +Y_UNIT_TEST(ThrowAndCatchTTestExceptionMedPriority) { ThrowAndCatchTTestException(DefaultRangeSize, DefaultThreadsCount, TLocalExecutor::EFlags::WAIT_COMPLETE | TLocalExecutor::EFlags::MED_PRIORITY); } -Y_UNIT_TEST(ThrowAndCatchTTestExceptionHighPriority) { +Y_UNIT_TEST(ThrowAndCatchTTestExceptionHighPriority) { ThrowAndCatchTTestException(DefaultRangeSize, DefaultThreadsCount, TLocalExecutor::EFlags::WAIT_COMPLETE | TLocalExecutor::EFlags::HIGH_PRIORITY); } -Y_UNIT_TEST(ThrowAndCatchTTestExceptionWaitComplete) { +Y_UNIT_TEST(ThrowAndCatchTTestExceptionWaitComplete) { ThrowAndCatchTTestException(DefaultRangeSize, DefaultThreadsCount, TLocalExecutor::EFlags::WAIT_COMPLETE); } -Y_UNIT_TEST(RethrowExeptionSequentialWaitComplete) { +Y_UNIT_TEST(RethrowExeptionSequentialWaitComplete) { ThrowAndCatchTTestException(DefaultRangeSize, 0, TLocalExecutor::EFlags::WAIT_COMPLETE); } -Y_UNIT_TEST(RethrowExeptionOneExtraThreadWaitComplete) { +Y_UNIT_TEST(RethrowExeptionOneExtraThreadWaitComplete) { ThrowAndCatchTTestException(DefaultRangeSize, 1, TLocalExecutor::EFlags::WAIT_COMPLETE); } @@ -314,7 +314,7 @@ void CatchTTestExceptionFromNested(TAtomic& processed1, TAtomic& processed2) { 0, DefaultRangeSize, TLocalExecutor::EFlags::WAIT_COMPLETE); } -Y_UNIT_TEST(NestedParallelExceptionsDoNotLeak) { +Y_UNIT_TEST(NestedParallelExceptionsDoNotLeak) { TAtomic processed1 = 0; TAtomic processed2 = 0; UNIT_ASSERT_NO_EXCEPTION( diff --git a/library/cpp/threading/local_executor/ut/ya.make b/library/cpp/threading/local_executor/ut/ya.make index be579a5ca0..2983c4f466 100644 --- a/library/cpp/threading/local_executor/ut/ya.make +++ b/library/cpp/threading/local_executor/ut/ya.make @@ -1,10 +1,10 @@ OWNER( g:matrixnet - gulin -) + gulin +) UNITTEST_FOR(library/cpp/threading/local_executor) - + SRCS( local_executor_ut.cpp ) diff --git a/library/cpp/threading/local_executor/ya.make b/library/cpp/threading/local_executor/ya.make index df210f92bb..516be66703 100644 --- a/library/cpp/threading/local_executor/ya.make +++ b/library/cpp/threading/local_executor/ya.make @@ -5,8 +5,8 @@ OWNER( espetrov ) -LIBRARY() - +LIBRARY() + SRCS( local_executor.cpp tbb_local_executor.cpp diff --git a/library/cpp/threading/poor_man_openmp/thread_helper_ut.cpp b/library/cpp/threading/poor_man_openmp/thread_helper_ut.cpp index 7417636864..0f91c1ce4a 100644 --- a/library/cpp/threading/poor_man_openmp/thread_helper_ut.cpp +++ b/library/cpp/threading/poor_man_openmp/thread_helper_ut.cpp @@ -5,8 +5,8 @@ #include <util/generic/string.h> #include <util/generic/yexception.h> -Y_UNIT_TEST_SUITE(TestMP) { - Y_UNIT_TEST(TestErr) { +Y_UNIT_TEST_SUITE(TestMP) { + Y_UNIT_TEST(TestErr) { std::function<void(int)> f = [](int x) { if (x == 5) { ythrow yexception() << "oops"; diff --git a/library/cpp/threading/queue/basic_ut.cpp b/library/cpp/threading/queue/basic_ut.cpp index 5f56f8583e..a52b46c8a6 100644 --- a/library/cpp/threading/queue/basic_ut.cpp +++ b/library/cpp/threading/queue/basic_ut.cpp @@ -51,7 +51,7 @@ public: template <size_t NUMBER_OF_THREADS> void RepeatPush1Pop1_InManyThreads() { - class TCycleThread: public ISimpleThread { + class TCycleThread: public ISimpleThread { public: void* ThreadProc() override { TQueueType queue; diff --git a/library/cpp/threading/queue/queue_ut.cpp b/library/cpp/threading/queue/queue_ut.cpp index 80eca147da..eb77e51e19 100644 --- a/library/cpp/threading/queue/queue_ut.cpp +++ b/library/cpp/threading/queue/queue_ut.cpp @@ -43,7 +43,7 @@ public: void Threads2_Push1M_Threads1_Pop2M() { TQueueType queue; - class TPusherThread: public ISimpleThread { + class TPusherThread: public ISimpleThread { public: TPusherThread(TQueueType& theQueue, char* start) : Queue(theQueue) @@ -81,7 +81,7 @@ public: void Threads4_Push1M_Threads1_Pop4M() { TQueueType queue; - class TPusherThread: public ISimpleThread { + class TPusherThread: public ISimpleThread { public: TPusherThread(TQueueType& theQueue, char* start) : Queue(theQueue) @@ -124,7 +124,7 @@ public: void ManyRndPush100K_ManyQueues() { TQueueType queue[NUMBER_OF_QUEUES]; - class TPusherThread: public ISimpleThread { + class TPusherThread: public ISimpleThread { public: TPusherThread(TQueueType* queues, char* start) : Queues(queues) @@ -155,7 +155,7 @@ public: } }; - class TPopperThread: public ISimpleThread { + class TPopperThread: public ISimpleThread { public: TPopperThread(TQueueType* theQueue, char* base) : Queue(theQueue) diff --git a/library/cpp/threading/queue/tune_ut.cpp b/library/cpp/threading/queue/tune_ut.cpp index 7e980d3e27..34086ccf0f 100644 --- a/library/cpp/threading/queue/tune_ut.cpp +++ b/library/cpp/threading/queue/tune_ut.cpp @@ -19,8 +19,8 @@ DeclareTuneTypeParam(TweakStructB, TStructB); DeclareTuneValueParam(TweakParam1, ui32, Param1); DeclareTuneValueParam(TweakParam2, ui32, Param2); -Y_UNIT_TEST_SUITE(TestTuning) { - Y_UNIT_TEST(Defaults) { +Y_UNIT_TEST_SUITE(TestTuning) { + Y_UNIT_TEST(Defaults) { using TTuned = TTune<TDefaults>; using TunedA = TTuned::TStructA; using TunedB = TTuned::TStructB; @@ -35,7 +35,7 @@ Y_UNIT_TEST_SUITE(TestTuning) { UNIT_ASSERT_EQUAL(param2, 42); } - Y_UNIT_TEST(TuneStructA) { + Y_UNIT_TEST(TuneStructA) { struct TMyStruct { }; @@ -56,7 +56,7 @@ Y_UNIT_TEST_SUITE(TestTuning) { UNIT_ASSERT_EQUAL(param2, 42); } - Y_UNIT_TEST(TuneParam1) { + Y_UNIT_TEST(TuneParam1) { using TTuned = TTune<TDefaults, TweakParam1<24>>; using TunedA = TTuned::TStructA; @@ -72,7 +72,7 @@ Y_UNIT_TEST_SUITE(TestTuning) { UNIT_ASSERT_EQUAL(param2, 42); } - Y_UNIT_TEST(TuneStructAAndParam1) { + Y_UNIT_TEST(TuneStructAAndParam1) { struct TMyStruct { }; @@ -94,7 +94,7 @@ Y_UNIT_TEST_SUITE(TestTuning) { UNIT_ASSERT_EQUAL(param2, 42); } - Y_UNIT_TEST(TuneParam1AndStructA) { + Y_UNIT_TEST(TuneParam1AndStructA) { struct TMyStruct { }; diff --git a/library/cpp/threading/queue/unordered_ut.cpp b/library/cpp/threading/queue/unordered_ut.cpp index a43b7f520e..1310559c46 100644 --- a/library/cpp/threading/queue/unordered_ut.cpp +++ b/library/cpp/threading/queue/unordered_ut.cpp @@ -56,7 +56,7 @@ public: void ManyThreadsRndExchange() { TQueueType queues[COUNT]; - class TWorker: public ISimpleThread { + class TWorker: public ISimpleThread { public: TWorker( TQueueType* queues_, diff --git a/library/cpp/threading/skip_list/skiplist_ut.cpp b/library/cpp/threading/skip_list/skiplist_ut.cpp index 52fcffda66..9c483de136 100644 --- a/library/cpp/threading/skip_list/skiplist_ut.cpp +++ b/library/cpp/threading/skip_list/skiplist_ut.cpp @@ -35,15 +35,15 @@ namespace NThreading { //////////////////////////////////////////////////////////////////////////////// - Y_UNIT_TEST_SUITE(TSkipListTest) { - Y_UNIT_TEST(ShouldBeEmptyAfterCreation) { + Y_UNIT_TEST_SUITE(TSkipListTest) { + Y_UNIT_TEST(ShouldBeEmptyAfterCreation) { TMemoryPool pool(1024); TSkipList<int> list(pool); UNIT_ASSERT_EQUAL(list.GetSize(), 0); } - Y_UNIT_TEST(ShouldAllowInsertion) { + Y_UNIT_TEST(ShouldAllowInsertion) { TMemoryPool pool(1024); TSkipList<int> list(pool); @@ -51,7 +51,7 @@ namespace NThreading { UNIT_ASSERT_EQUAL(list.GetSize(), 1); } - Y_UNIT_TEST(ShouldNotAllowDuplicates) { + Y_UNIT_TEST(ShouldNotAllowDuplicates) { TMemoryPool pool(1024); TSkipList<int> list(pool); @@ -62,7 +62,7 @@ namespace NThreading { UNIT_ASSERT_EQUAL(list.GetSize(), 1); } - Y_UNIT_TEST(ShouldContainInsertedItem) { + Y_UNIT_TEST(ShouldContainInsertedItem) { TMemoryPool pool(1024); TSkipList<int> list(pool); @@ -70,7 +70,7 @@ namespace NThreading { UNIT_ASSERT(list.Contains(12345678)); } - Y_UNIT_TEST(ShouldNotContainNotInsertedItem) { + Y_UNIT_TEST(ShouldNotContainNotInsertedItem) { TMemoryPool pool(1024); TSkipList<int> list(pool); @@ -78,7 +78,7 @@ namespace NThreading { UNIT_ASSERT(!list.Contains(87654321)); } - Y_UNIT_TEST(ShouldIterateAllItems) { + Y_UNIT_TEST(ShouldIterateAllItems) { TMemoryPool pool(1024); TSkipList<int> list(pool); @@ -95,7 +95,7 @@ namespace NThreading { UNIT_ASSERT(!it.IsValid()); } - Y_UNIT_TEST(ShouldIterateAllItemsInReverseDirection) { + Y_UNIT_TEST(ShouldIterateAllItemsInReverseDirection) { TMemoryPool pool(1024); TSkipList<int> list(pool); @@ -112,7 +112,7 @@ namespace NThreading { UNIT_ASSERT(!it.IsValid()); } - Y_UNIT_TEST(ShouldSeekToFirstItem) { + Y_UNIT_TEST(ShouldSeekToFirstItem) { TMemoryPool pool(1024); TSkipList<int> list(pool); @@ -125,7 +125,7 @@ namespace NThreading { UNIT_ASSERT_EQUAL(it.GetValue(), 1); } - Y_UNIT_TEST(ShouldSeekToLastItem) { + Y_UNIT_TEST(ShouldSeekToLastItem) { TMemoryPool pool(1024); TSkipList<int> list(pool); @@ -138,7 +138,7 @@ namespace NThreading { UNIT_ASSERT_EQUAL(it.GetValue(), 9); } - Y_UNIT_TEST(ShouldSeekToExistingItem) { + Y_UNIT_TEST(ShouldSeekToExistingItem) { TMemoryPool pool(1024); TSkipList<int> list(pool); @@ -148,7 +148,7 @@ namespace NThreading { UNIT_ASSERT(it.IsValid()); } - Y_UNIT_TEST(ShouldSeekAfterMissedItem) { + Y_UNIT_TEST(ShouldSeekAfterMissedItem) { TMemoryPool pool(1024); TSkipList<int> list(pool); @@ -164,7 +164,7 @@ namespace NThreading { UNIT_ASSERT_EQUAL(it.GetValue(), 100); } - Y_UNIT_TEST(ShouldCallDtorsOfNonPodTypes) { + Y_UNIT_TEST(ShouldCallDtorsOfNonPodTypes) { UNIT_ASSERT(!TTypeTraits<TTestObject>::IsPod); UNIT_ASSERT_EQUAL(TTestObject::Count, 0); diff --git a/library/cpp/threading/task_scheduler/task_scheduler.cpp b/library/cpp/threading/task_scheduler/task_scheduler.cpp index 174dde4bf7..95bd27d7cf 100644 --- a/library/cpp/threading/task_scheduler/task_scheduler.cpp +++ b/library/cpp/threading/task_scheduler/task_scheduler.cpp @@ -2,7 +2,7 @@ #include <util/system/thread.h> #include <util/string/cast.h> -#include <util/stream/output.h> +#include <util/stream/output.h> TTaskScheduler::ITask::~ITask() {} TTaskScheduler::IRepeatedTask::~IRepeatedTask() {} @@ -10,7 +10,7 @@ TTaskScheduler::IRepeatedTask::~IRepeatedTask() {} class TTaskScheduler::TWorkerThread - : public ISimpleThread + : public ISimpleThread { public: TWorkerThread(TTaskScheduler& state) @@ -152,8 +152,8 @@ const bool debugOutput = false; void TTaskScheduler::ChangeDebugState(TWorkerThread* thread, const TString& state) { if (!debugOutput) { - Y_UNUSED(thread); - Y_UNUSED(state); + Y_UNUSED(thread); + Y_UNUSED(state); return; } diff --git a/library/cpp/threading/task_scheduler/task_scheduler_ut.cpp b/library/cpp/threading/task_scheduler/task_scheduler_ut.cpp index 3b5203194a..8f21984b77 100644 --- a/library/cpp/threading/task_scheduler/task_scheduler_ut.cpp +++ b/library/cpp/threading/task_scheduler/task_scheduler_ut.cpp @@ -1,7 +1,7 @@ #include <algorithm> #include <library/cpp/testing/unittest/registar.h> -#include <util/stream/output.h> +#include <util/stream/output.h> #include <util/system/atomic.h> #include <util/generic/vector.h> |