aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/threading
diff options
context:
space:
mode:
authoryazevnul <yazevnul@yandex-team.ru>2022-02-10 16:46:46 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:46:46 +0300
commit8cbc307de0221f84c80c42dcbe07d40727537e2c (patch)
tree625d5a673015d1df891e051033e9fcde5c7be4e5 /library/cpp/threading
parent30d1ef3941e0dc835be7609de5ebee66958f215a (diff)
downloadydb-8cbc307de0221f84c80c42dcbe07d40727537e2c.tar.gz
Restoring authorship annotation for <yazevnul@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/threading')
-rw-r--r--library/cpp/threading/atomic/bool_ut.cpp4
-rw-r--r--library/cpp/threading/chunk_queue/queue_ut.cpp32
-rw-r--r--library/cpp/threading/equeue/equeue_ut.cpp6
-rw-r--r--library/cpp/threading/future/async_ut.cpp8
-rw-r--r--library/cpp/threading/future/core/future.h4
-rw-r--r--library/cpp/threading/future/future_ut.cpp44
-rw-r--r--library/cpp/threading/future/fwd.cpp2
-rw-r--r--library/cpp/threading/future/fwd.h12
-rw-r--r--library/cpp/threading/future/legacy_future.h4
-rw-r--r--library/cpp/threading/future/legacy_future_ut.cpp10
-rw-r--r--library/cpp/threading/future/wait/wait.h4
-rw-r--r--library/cpp/threading/future/ya.make10
-rw-r--r--library/cpp/threading/local_executor/local_executor.cpp568
-rw-r--r--library/cpp/threading/local_executor/local_executor.h92
-rw-r--r--library/cpp/threading/local_executor/ut/local_executor_ut.cpp56
-rw-r--r--library/cpp/threading/local_executor/ut/ya.make6
-rw-r--r--library/cpp/threading/local_executor/ya.make4
-rw-r--r--library/cpp/threading/poor_man_openmp/thread_helper_ut.cpp4
-rw-r--r--library/cpp/threading/queue/basic_ut.cpp2
-rw-r--r--library/cpp/threading/queue/queue_ut.cpp8
-rw-r--r--library/cpp/threading/queue/tune_ut.cpp12
-rw-r--r--library/cpp/threading/queue/unordered_ut.cpp2
-rw-r--r--library/cpp/threading/skip_list/skiplist_ut.cpp26
-rw-r--r--library/cpp/threading/task_scheduler/task_scheduler.cpp8
-rw-r--r--library/cpp/threading/task_scheduler/task_scheduler_ut.cpp2
25 files changed, 465 insertions, 465 deletions
diff --git a/library/cpp/threading/atomic/bool_ut.cpp b/library/cpp/threading/atomic/bool_ut.cpp
index 9481f41d8d..86ea26a23d 100644
--- a/library/cpp/threading/atomic/bool_ut.cpp
+++ b/library/cpp/threading/atomic/bool_ut.cpp
@@ -2,8 +2,8 @@
#include <library/cpp/testing/unittest/registar.h>
-Y_UNIT_TEST_SUITE(AtomicBool) {
- Y_UNIT_TEST(ReadWrite) {
+Y_UNIT_TEST_SUITE(AtomicBool) {
+ Y_UNIT_TEST(ReadWrite) {
NAtomic::TBool v;
UNIT_ASSERT_VALUES_EQUAL((bool)v, false);
diff --git a/library/cpp/threading/chunk_queue/queue_ut.cpp b/library/cpp/threading/chunk_queue/queue_ut.cpp
index 8cb36d8dd1..3913bd2545 100644
--- a/library/cpp/threading/chunk_queue/queue_ut.cpp
+++ b/library/cpp/threading/chunk_queue/queue_ut.cpp
@@ -7,8 +7,8 @@
namespace NThreading {
////////////////////////////////////////////////////////////////////////////////
- Y_UNIT_TEST_SUITE(TOneOneQueueTest){
- Y_UNIT_TEST(ShouldBeEmptyAtStart){
+ Y_UNIT_TEST_SUITE(TOneOneQueueTest){
+ Y_UNIT_TEST(ShouldBeEmptyAtStart){
TOneOneQueue<int> queue;
int result = 0;
@@ -16,7 +16,7 @@ namespace NThreading {
UNIT_ASSERT(!queue.Dequeue(result));
}
-Y_UNIT_TEST(ShouldReturnEntries) {
+Y_UNIT_TEST(ShouldReturnEntries) {
TOneOneQueue<int> queue;
queue.Enqueue(1);
queue.Enqueue(2);
@@ -39,7 +39,7 @@ Y_UNIT_TEST(ShouldReturnEntries) {
UNIT_ASSERT(!queue.Dequeue(result));
}
-Y_UNIT_TEST(ShouldStoreMultipleChunks) {
+Y_UNIT_TEST(ShouldStoreMultipleChunks) {
TOneOneQueue<int, 100> queue;
for (int i = 0; i < 1000; ++i) {
queue.Enqueue(i);
@@ -57,8 +57,8 @@ Y_UNIT_TEST(ShouldStoreMultipleChunks) {
////////////////////////////////////////////////////////////////////////////////
-Y_UNIT_TEST_SUITE(TManyOneQueueTest){
- Y_UNIT_TEST(ShouldBeEmptyAtStart){
+Y_UNIT_TEST_SUITE(TManyOneQueueTest){
+ Y_UNIT_TEST(ShouldBeEmptyAtStart){
TManyOneQueue<int> queue;
int result;
@@ -66,7 +66,7 @@ UNIT_ASSERT(queue.IsEmpty());
UNIT_ASSERT(!queue.Dequeue(result));
}
-Y_UNIT_TEST(ShouldReturnEntries) {
+Y_UNIT_TEST(ShouldReturnEntries) {
TManyOneQueue<int> queue;
queue.Enqueue(1);
queue.Enqueue(2);
@@ -93,8 +93,8 @@ Y_UNIT_TEST(ShouldReturnEntries) {
////////////////////////////////////////////////////////////////////////////////
-Y_UNIT_TEST_SUITE(TManyManyQueueTest){
- Y_UNIT_TEST(ShouldBeEmptyAtStart){
+Y_UNIT_TEST_SUITE(TManyManyQueueTest){
+ Y_UNIT_TEST(ShouldBeEmptyAtStart){
TManyManyQueue<int> queue;
int result = 0;
@@ -102,7 +102,7 @@ UNIT_ASSERT(queue.IsEmpty());
UNIT_ASSERT(!queue.Dequeue(result));
}
-Y_UNIT_TEST(ShouldReturnEntries) {
+Y_UNIT_TEST(ShouldReturnEntries) {
TManyManyQueue<int> queue;
queue.Enqueue(1);
queue.Enqueue(2);
@@ -129,8 +129,8 @@ Y_UNIT_TEST(ShouldReturnEntries) {
////////////////////////////////////////////////////////////////////////////////
-Y_UNIT_TEST_SUITE(TRelaxedManyOneQueueTest){
- Y_UNIT_TEST(ShouldBeEmptyAtStart){
+Y_UNIT_TEST_SUITE(TRelaxedManyOneQueueTest){
+ Y_UNIT_TEST(ShouldBeEmptyAtStart){
TRelaxedManyOneQueue<int> queue;
int result;
@@ -138,7 +138,7 @@ UNIT_ASSERT(queue.IsEmpty());
UNIT_ASSERT(!queue.Dequeue(result));
}
-Y_UNIT_TEST(ShouldReturnEntries) {
+Y_UNIT_TEST(ShouldReturnEntries) {
TSet<int> items = {1, 2, 3};
TRelaxedManyOneQueue<int> queue;
@@ -167,8 +167,8 @@ Y_UNIT_TEST(ShouldReturnEntries) {
////////////////////////////////////////////////////////////////////////////////
-Y_UNIT_TEST_SUITE(TRelaxedManyManyQueueTest){
- Y_UNIT_TEST(ShouldBeEmptyAtStart){
+Y_UNIT_TEST_SUITE(TRelaxedManyManyQueueTest){
+ Y_UNIT_TEST(ShouldBeEmptyAtStart){
TRelaxedManyManyQueue<int> queue;
int result = 0;
@@ -176,7 +176,7 @@ UNIT_ASSERT(queue.IsEmpty());
UNIT_ASSERT(!queue.Dequeue(result));
}
-Y_UNIT_TEST(ShouldReturnEntries) {
+Y_UNIT_TEST(ShouldReturnEntries) {
TSet<int> items = {1, 2, 3};
TRelaxedManyManyQueue<int> queue;
diff --git a/library/cpp/threading/equeue/equeue_ut.cpp b/library/cpp/threading/equeue/equeue_ut.cpp
index 9cf2aced44..a2072f9a83 100644
--- a/library/cpp/threading/equeue/equeue_ut.cpp
+++ b/library/cpp/threading/equeue/equeue_ut.cpp
@@ -6,7 +6,7 @@
#include <util/datetime/base.h>
#include <util/generic/vector.h>
-Y_UNIT_TEST_SUITE(TElasticQueueTest) {
+Y_UNIT_TEST_SUITE(TElasticQueueTest) {
const size_t MaxQueueSize = 20;
const size_t ThreadCount = 10;
const size_t N = 100000;
@@ -37,7 +37,7 @@ Y_UNIT_TEST_SUITE(TElasticQueueTest) {
//fill test -- fill queue with "endless" jobs
TSystemEvent WaitEvent;
- Y_UNIT_TEST(FillTest) {
+ Y_UNIT_TEST(FillTest) {
Counters.Reset();
struct TWaitJob: public IObjectInQueue {
@@ -91,7 +91,7 @@ Y_UNIT_TEST_SUITE(TElasticQueueTest) {
static size_t TryCounter;
- Y_UNIT_TEST(ConcurrentTest) {
+ Y_UNIT_TEST(ConcurrentTest) {
Counters.Reset();
TryCounter = 0;
diff --git a/library/cpp/threading/future/async_ut.cpp b/library/cpp/threading/future/async_ut.cpp
index a3699744e4..07f2a66951 100644
--- a/library/cpp/threading/future/async_ut.cpp
+++ b/library/cpp/threading/future/async_ut.cpp
@@ -27,15 +27,15 @@ namespace NThreading {
}
-Y_UNIT_TEST_SUITE(Async) {
- Y_UNIT_TEST(ExtensionExample) {
+Y_UNIT_TEST_SUITE(Async) {
+ Y_UNIT_TEST(ExtensionExample) {
TMySuperTaskQueue queue;
auto future = NThreading::Async([]() { return 5; }, queue);
future.Wait();
UNIT_ASSERT_VALUES_EQUAL(future.GetValue(), 5);
}
- Y_UNIT_TEST(WorksWithIMtpQueue) {
+ Y_UNIT_TEST(WorksWithIMtpQueue) {
auto queue = MakeHolder<TThreadPool>();
queue->Start(1);
@@ -44,7 +44,7 @@ Y_UNIT_TEST_SUITE(Async) {
UNIT_ASSERT_VALUES_EQUAL(future.GetValue(), 5);
}
- Y_UNIT_TEST(ProperlyDeducesFutureType) {
+ Y_UNIT_TEST(ProperlyDeducesFutureType) {
// Compileability test
auto queue = CreateThreadPool(1);
diff --git a/library/cpp/threading/future/core/future.h b/library/cpp/threading/future/core/future.h
index 2e82bb953e..4ab56efb30 100644
--- a/library/cpp/threading/future/core/future.h
+++ b/library/cpp/threading/future/core/future.h
@@ -1,7 +1,7 @@
#pragma once
-#include "fwd.h"
-
+#include "fwd.h"
+
#include <util/datetime/base.h>
#include <util/generic/function.h>
#include <util/generic/maybe.h>
diff --git a/library/cpp/threading/future/future_ut.cpp b/library/cpp/threading/future/future_ut.cpp
index 05950a568d..f45fc84e23 100644
--- a/library/cpp/threading/future/future_ut.cpp
+++ b/library/cpp/threading/future/future_ut.cpp
@@ -64,8 +64,8 @@ namespace {
////////////////////////////////////////////////////////////////////////////////
- Y_UNIT_TEST_SUITE(TFutureTest) {
- Y_UNIT_TEST(ShouldInitiallyHasNoValue) {
+ Y_UNIT_TEST_SUITE(TFutureTest) {
+ Y_UNIT_TEST(ShouldInitiallyHasNoValue) {
TPromise<int> promise;
UNIT_ASSERT(!promise.HasValue());
@@ -79,7 +79,7 @@ namespace {
UNIT_ASSERT(!future.HasValue());
}
- Y_UNIT_TEST(ShouldInitiallyHasNoValueVoid) {
+ Y_UNIT_TEST(ShouldInitiallyHasNoValueVoid) {
TPromise<void> promise;
UNIT_ASSERT(!promise.HasValue());
@@ -93,7 +93,7 @@ namespace {
UNIT_ASSERT(!future.HasValue());
}
- Y_UNIT_TEST(ShouldStoreValue) {
+ Y_UNIT_TEST(ShouldStoreValue) {
TPromise<int> promise = NewPromise<int>();
promise.SetValue(123);
UNIT_ASSERT(promise.HasValue());
@@ -108,7 +108,7 @@ namespace {
UNIT_ASSERT_EQUAL(future.GetValue(), 345);
}
- Y_UNIT_TEST(ShouldStoreValueVoid) {
+ Y_UNIT_TEST(ShouldStoreValueVoid) {
TPromise<void> promise = NewPromise();
promise.SetValue();
UNIT_ASSERT(promise.HasValue());
@@ -151,7 +151,7 @@ namespace {
}
};
- Y_UNIT_TEST(ShouldInvokeCallback) {
+ Y_UNIT_TEST(ShouldInvokeCallback) {
TPromise<int> promise = NewPromise<int>();
TTestCallback callback(123);
@@ -163,7 +163,7 @@ namespace {
UNIT_ASSERT_EQUAL(callback.Value, 123 + 456);
}
- Y_UNIT_TEST(ShouldApplyFunc) {
+ Y_UNIT_TEST(ShouldApplyFunc) {
TPromise<int> promise = NewPromise<int>();
TTestCallback callback(123);
@@ -175,7 +175,7 @@ namespace {
UNIT_ASSERT_EQUAL(callback.Value, 123 + 456);
}
- Y_UNIT_TEST(ShouldApplyVoidFunc) {
+ Y_UNIT_TEST(ShouldApplyVoidFunc) {
TPromise<int> promise = NewPromise<int>();
TTestCallback callback(123);
@@ -186,7 +186,7 @@ namespace {
UNIT_ASSERT(future.HasValue());
}
- Y_UNIT_TEST(ShouldApplyFutureFunc) {
+ Y_UNIT_TEST(ShouldApplyFutureFunc) {
TPromise<int> promise = NewPromise<int>();
TTestCallback callback(123);
@@ -198,7 +198,7 @@ namespace {
UNIT_ASSERT_EQUAL(callback.Value, 123 + 456);
}
- Y_UNIT_TEST(ShouldApplyFutureVoidFunc) {
+ Y_UNIT_TEST(ShouldApplyFutureVoidFunc) {
TPromise<int> promise = NewPromise<int>();
TTestCallback callback(123);
@@ -212,7 +212,7 @@ namespace {
UNIT_ASSERT(future.HasValue());
}
- Y_UNIT_TEST(ShouldIgnoreResultIfAsked) {
+ Y_UNIT_TEST(ShouldIgnoreResultIfAsked) {
TPromise<int> promise = NewPromise<int>();
TTestCallback callback(123);
@@ -225,7 +225,7 @@ namespace {
class TCustomException: public yexception {
};
- Y_UNIT_TEST(ShouldRethrowException) {
+ Y_UNIT_TEST(ShouldRethrowException) {
TPromise<int> promise = NewPromise<int>();
try {
ythrow TCustomException();
@@ -335,7 +335,7 @@ namespace {
UNIT_ASSERT(future.HasValue());
}
- Y_UNIT_TEST(ShouldWaitAnyVector) {
+ Y_UNIT_TEST(ShouldWaitAnyVector) {
TPromise<void> promise1 = NewPromise();
TPromise<void> promise2 = NewPromise();
@@ -372,7 +372,7 @@ namespace {
UNIT_ASSERT(future.HasValue());
}
- Y_UNIT_TEST(ShouldWaitAnyList) {
+ Y_UNIT_TEST(ShouldWaitAnyList) {
TPromise<void> promise1 = NewPromise();
TPromise<void> promise2 = NewPromise();
@@ -390,14 +390,14 @@ namespace {
UNIT_ASSERT(future.HasValue());
}
- Y_UNIT_TEST(ShouldWaitAnyVectorEmpty) {
+ Y_UNIT_TEST(ShouldWaitAnyVectorEmpty) {
TVector<TFuture<void>> promises;
TFuture<void> future = WaitAny(promises);
UNIT_ASSERT(future.HasValue());
}
- Y_UNIT_TEST(ShouldWaitAny) {
+ Y_UNIT_TEST(ShouldWaitAny) {
TPromise<void> promise1 = NewPromise();
TPromise<void> promise2 = NewPromise();
@@ -411,7 +411,7 @@ namespace {
UNIT_ASSERT(future.HasValue());
}
- Y_UNIT_TEST(ShouldStoreTypesWithoutDefaultConstructor) {
+ Y_UNIT_TEST(ShouldStoreTypesWithoutDefaultConstructor) {
// compileability test
struct TRec {
explicit TRec(int) {
@@ -426,7 +426,7 @@ namespace {
Y_UNUSED(rec);
}
- Y_UNIT_TEST(ShouldStoreMovableTypes) {
+ Y_UNIT_TEST(ShouldStoreMovableTypes) {
// compileability test
struct TRec : TMoveOnly {
explicit TRec(int) {
@@ -441,7 +441,7 @@ namespace {
Y_UNUSED(rec);
}
- Y_UNIT_TEST(ShouldMoveMovableTypes) {
+ Y_UNIT_TEST(ShouldMoveMovableTypes) {
// compileability test
struct TRec : TMoveOnly {
explicit TRec(int) {
@@ -456,7 +456,7 @@ namespace {
Y_UNUSED(rec);
}
- Y_UNIT_TEST(ShouldNotExtractAfterGet) {
+ Y_UNIT_TEST(ShouldNotExtractAfterGet) {
TPromise<int> promise = NewPromise<int>();
promise.SetValue(123);
UNIT_ASSERT(promise.HasValue());
@@ -464,7 +464,7 @@ namespace {
UNIT_CHECK_GENERATED_EXCEPTION(promise.ExtractValue(), TFutureException);
}
- Y_UNIT_TEST(ShouldNotGetAfterExtract) {
+ Y_UNIT_TEST(ShouldNotGetAfterExtract) {
TPromise<int> promise = NewPromise<int>();
promise.SetValue(123);
UNIT_ASSERT(promise.HasValue());
@@ -472,7 +472,7 @@ namespace {
UNIT_CHECK_GENERATED_EXCEPTION(promise.GetValue(), TFutureException);
}
- Y_UNIT_TEST(ShouldNotExtractAfterExtract) {
+ Y_UNIT_TEST(ShouldNotExtractAfterExtract) {
TPromise<int> promise = NewPromise<int>();
promise.SetValue(123);
UNIT_ASSERT(promise.HasValue());
diff --git a/library/cpp/threading/future/fwd.cpp b/library/cpp/threading/future/fwd.cpp
index 4214b6df83..2261ef316c 100644
--- a/library/cpp/threading/future/fwd.cpp
+++ b/library/cpp/threading/future/fwd.cpp
@@ -1 +1 @@
-#include "fwd.h"
+#include "fwd.h"
diff --git a/library/cpp/threading/future/fwd.h b/library/cpp/threading/future/fwd.h
index 0cd25dd288..b51d9b0019 100644
--- a/library/cpp/threading/future/fwd.h
+++ b/library/cpp/threading/future/fwd.h
@@ -1,8 +1,8 @@
-#pragma once
-
+#pragma once
+
#include "core/fwd.h"
-namespace NThreading {
- template <typename TR = void, bool IgnoreException = false>
- class TLegacyFuture;
-}
+namespace NThreading {
+ template <typename TR = void, bool IgnoreException = false>
+ class TLegacyFuture;
+}
diff --git a/library/cpp/threading/future/legacy_future.h b/library/cpp/threading/future/legacy_future.h
index 6f1eabad73..9bb126e76b 100644
--- a/library/cpp/threading/future/legacy_future.h
+++ b/library/cpp/threading/future/legacy_future.h
@@ -1,6 +1,6 @@
#pragma once
-#include "fwd.h"
+#include "fwd.h"
#include "future.h"
#include <util/thread/factory.h>
@@ -8,7 +8,7 @@
#include <functional>
namespace NThreading {
- template <typename TR, bool IgnoreException>
+ template <typename TR, bool IgnoreException>
class TLegacyFuture: public IThreadFactory::IThreadAble, TNonCopyable {
public:
typedef TR(TFunctionSignature)();
diff --git a/library/cpp/threading/future/legacy_future_ut.cpp b/library/cpp/threading/future/legacy_future_ut.cpp
index ff63db1725..b0c9dc21aa 100644
--- a/library/cpp/threading/future/legacy_future_ut.cpp
+++ b/library/cpp/threading/future/legacy_future_ut.cpp
@@ -3,12 +3,12 @@
#include <library/cpp/testing/unittest/registar.h>
namespace NThreading {
- Y_UNIT_TEST_SUITE(TLegacyFutureTest) {
+ Y_UNIT_TEST_SUITE(TLegacyFutureTest) {
int intf() {
return 17;
}
- Y_UNIT_TEST(TestIntFunction) {
+ Y_UNIT_TEST(TestIntFunction) {
TLegacyFuture<int> f((&intf));
UNIT_ASSERT_VALUES_EQUAL(17, f.Get());
}
@@ -19,7 +19,7 @@ namespace NThreading {
r = 18;
}
- Y_UNIT_TEST(TestVoidFunction) {
+ Y_UNIT_TEST(TestVoidFunction) {
r = 0;
TLegacyFuture<> f((&voidf));
f.Get();
@@ -39,7 +39,7 @@ namespace NThreading {
}
};
- Y_UNIT_TEST(TestMethod) {
+ Y_UNIT_TEST(TestMethod) {
TLegacyFuture<int> f11(std::bind(&TSampleClass::Calc, TSampleClass(3)));
UNIT_ASSERT_VALUES_EQUAL(4, f11.Get());
@@ -57,7 +57,7 @@ namespace NThreading {
struct TSomeThreadPool: public IThreadFactory {};
- Y_UNIT_TEST(TestFunction) {
+ Y_UNIT_TEST(TestFunction) {
std::function<int()> f((&intf));
UNIT_ASSERT_VALUES_EQUAL(17, TLegacyFuture<int>(f).Get());
diff --git a/library/cpp/threading/future/wait/wait.h b/library/cpp/threading/future/wait/wait.h
index 6ff7d57baa..6497574cec 100644
--- a/library/cpp/threading/future/wait/wait.h
+++ b/library/cpp/threading/future/wait/wait.h
@@ -1,7 +1,7 @@
#pragma once
-#include "fwd.h"
-
+#include "fwd.h"
+
#include <library/cpp/threading/future/core/future.h>
#include <library/cpp/threading/future/wait/wait_group.h>
diff --git a/library/cpp/threading/future/ya.make b/library/cpp/threading/future/ya.make
index 6591031f46..d3ad13fa8e 100644
--- a/library/cpp/threading/future/ya.make
+++ b/library/cpp/threading/future/ya.make
@@ -1,14 +1,14 @@
-OWNER(
- g:rtmr
-)
-
+OWNER(
+ g:rtmr
+)
+
LIBRARY()
SRCS(
async.cpp
core/future.cpp
core/fwd.cpp
- fwd.cpp
+ fwd.cpp
wait/fwd.cpp
wait/wait.cpp
wait/wait_group.cpp
diff --git a/library/cpp/threading/local_executor/local_executor.cpp b/library/cpp/threading/local_executor/local_executor.cpp
index 1d3fbb4bf4..6e62d09d85 100644
--- a/library/cpp/threading/local_executor/local_executor.cpp
+++ b/library/cpp/threading/local_executor/local_executor.cpp
@@ -1,17 +1,17 @@
#include "local_executor.h"
#include <library/cpp/threading/future/future.h>
-
-#include <util/generic/utility.h>
-#include <util/system/atomic.h>
-#include <util/system/event.h>
+
+#include <util/generic/utility.h>
+#include <util/system/atomic.h>
+#include <util/system/event.h>
#include <util/system/thread.h>
-#include <util/system/tls.h>
+#include <util/system/tls.h>
#include <util/system/yield.h>
-#include <util/thread/lfqueue.h>
-
-#include <utility>
+#include <util/thread/lfqueue.h>
+#include <utility>
+
#ifdef _win_
static void RegularYield() {
}
@@ -23,11 +23,11 @@ static void RegularYield() {
}
#endif
-namespace {
- struct TFunctionWrapper : NPar::ILocallyExecutable {
- NPar::TLocallyExecutableFunction Exec;
- TFunctionWrapper(NPar::TLocallyExecutableFunction exec)
- : Exec(std::move(exec))
+namespace {
+ struct TFunctionWrapper : NPar::ILocallyExecutable {
+ NPar::TLocallyExecutableFunction Exec;
+ TFunctionWrapper(NPar::TLocallyExecutableFunction exec)
+ : Exec(std::move(exec))
{
}
void LocalExec(int id) override {
@@ -35,15 +35,15 @@ namespace {
}
};
- class TFunctionWrapperWithPromise: public NPar::ILocallyExecutable {
+ class TFunctionWrapperWithPromise: public NPar::ILocallyExecutable {
private:
- NPar::TLocallyExecutableFunction Exec;
+ NPar::TLocallyExecutableFunction Exec;
int FirstId, LastId;
TVector<NThreading::TPromise<void>> Promises;
public:
- TFunctionWrapperWithPromise(NPar::TLocallyExecutableFunction exec, int firstId, int lastId)
- : Exec(std::move(exec))
+ TFunctionWrapperWithPromise(NPar::TLocallyExecutableFunction exec, int firstId, int lastId)
+ : Exec(std::move(exec))
, FirstId(firstId)
, LastId(lastId)
{
@@ -70,300 +70,300 @@ namespace {
}
};
- struct TSingleJob {
- TIntrusivePtr<NPar::ILocallyExecutable> Exec;
- int Id{0};
+ struct TSingleJob {
+ TIntrusivePtr<NPar::ILocallyExecutable> Exec;
+ int Id{0};
- TSingleJob() = default;
- TSingleJob(TIntrusivePtr<NPar::ILocallyExecutable> exec, int id)
- : Exec(std::move(exec))
- , Id(id)
- {
+ TSingleJob() = default;
+ TSingleJob(TIntrusivePtr<NPar::ILocallyExecutable> exec, int id)
+ : Exec(std::move(exec))
+ , Id(id)
+ {
}
- };
+ };
- class TLocalRangeExecutor: public NPar::ILocallyExecutable {
- TIntrusivePtr<NPar::ILocallyExecutable> Exec;
+ class TLocalRangeExecutor: public NPar::ILocallyExecutable {
+ TIntrusivePtr<NPar::ILocallyExecutable> Exec;
alignas(64) TAtomic Counter;
alignas(64) TAtomic WorkerCount;
- int LastId;
-
- void LocalExec(int) override {
- AtomicAdd(WorkerCount, 1);
- for (;;) {
- if (!DoSingleOp())
- break;
- }
- AtomicAdd(WorkerCount, -1);
+ int LastId;
+
+ void LocalExec(int) override {
+ AtomicAdd(WorkerCount, 1);
+ for (;;) {
+ if (!DoSingleOp())
+ break;
+ }
+ AtomicAdd(WorkerCount, -1);
}
- public:
- TLocalRangeExecutor(TIntrusivePtr<ILocallyExecutable> exec, int firstId, int lastId)
- : Exec(std::move(exec))
- , Counter(firstId)
- , WorkerCount(0)
- , LastId(lastId)
- {
+ public:
+ TLocalRangeExecutor(TIntrusivePtr<ILocallyExecutable> exec, int firstId, int lastId)
+ : Exec(std::move(exec))
+ , Counter(firstId)
+ , WorkerCount(0)
+ , LastId(lastId)
+ {
}
- bool DoSingleOp() {
+ bool DoSingleOp() {
const int id = AtomicAdd(Counter, 1) - 1;
- if (id >= LastId)
- return false;
- Exec->LocalExec(id);
- RegularYield();
- return true;
+ if (id >= LastId)
+ return false;
+ Exec->LocalExec(id);
+ RegularYield();
+ return true;
}
- void WaitComplete() {
- while (AtomicGet(WorkerCount) > 0)
- RegularYield();
+ void WaitComplete() {
+ while (AtomicGet(WorkerCount) > 0)
+ RegularYield();
}
- int GetRangeSize() const {
- return Max<int>(LastId - Counter, 0);
+ int GetRangeSize() const {
+ return Max<int>(LastId - Counter, 0);
}
- };
+ };
}
-
-//////////////////////////////////////////////////////////////////////////
-class NPar::TLocalExecutor::TImpl {
-public:
- TLockFreeQueue<TSingleJob> JobQueue;
- TLockFreeQueue<TSingleJob> MedJobQueue;
- TLockFreeQueue<TSingleJob> LowJobQueue;
+
+//////////////////////////////////////////////////////////////////////////
+class NPar::TLocalExecutor::TImpl {
+public:
+ TLockFreeQueue<TSingleJob> JobQueue;
+ TLockFreeQueue<TSingleJob> MedJobQueue;
+ TLockFreeQueue<TSingleJob> LowJobQueue;
alignas(64) TSystemEvent HasJob;
-
- TAtomic ThreadCount{0};
+
+ TAtomic ThreadCount{0};
alignas(64) TAtomic QueueSize{0};
- TAtomic MPQueueSize{0};
- TAtomic LPQueueSize{0};
- TAtomic ThreadId{0};
-
- Y_THREAD(int)
- CurrentTaskPriority;
- Y_THREAD(int)
- WorkerThreadId;
-
- static void* HostWorkerThread(void* p);
- bool GetJob(TSingleJob* job);
- void RunNewThread();
- void LaunchRange(TIntrusivePtr<TLocalRangeExecutor> execRange, int queueSizeLimit,
- TAtomic* queueSize, TLockFreeQueue<TSingleJob>* jobQueue);
-
- TImpl() = default;
- ~TImpl();
-};
-
-NPar::TLocalExecutor::TImpl::~TImpl() {
- AtomicAdd(QueueSize, 1);
- JobQueue.Enqueue(TSingleJob(nullptr, 0));
- HasJob.Signal();
- while (AtomicGet(ThreadCount)) {
- ThreadYield();
- }
-}
-
-void* NPar::TLocalExecutor::TImpl::HostWorkerThread(void* p) {
- static const int FAST_ITERATIONS = 200;
-
- auto* const ctx = (TImpl*)p;
+ TAtomic MPQueueSize{0};
+ TAtomic LPQueueSize{0};
+ TAtomic ThreadId{0};
+
+ Y_THREAD(int)
+ CurrentTaskPriority;
+ Y_THREAD(int)
+ WorkerThreadId;
+
+ static void* HostWorkerThread(void* p);
+ bool GetJob(TSingleJob* job);
+ void RunNewThread();
+ void LaunchRange(TIntrusivePtr<TLocalRangeExecutor> execRange, int queueSizeLimit,
+ TAtomic* queueSize, TLockFreeQueue<TSingleJob>* jobQueue);
+
+ TImpl() = default;
+ ~TImpl();
+};
+
+NPar::TLocalExecutor::TImpl::~TImpl() {
+ AtomicAdd(QueueSize, 1);
+ JobQueue.Enqueue(TSingleJob(nullptr, 0));
+ HasJob.Signal();
+ while (AtomicGet(ThreadCount)) {
+ ThreadYield();
+ }
+}
+
+void* NPar::TLocalExecutor::TImpl::HostWorkerThread(void* p) {
+ static const int FAST_ITERATIONS = 200;
+
+ auto* const ctx = (TImpl*)p;
TThread::SetCurrentThreadName("ParLocalExecutor");
- ctx->WorkerThreadId = AtomicAdd(ctx->ThreadId, 1);
- for (bool cont = true; cont;) {
- TSingleJob job;
- bool gotJob = false;
- for (int iter = 0; iter < FAST_ITERATIONS; ++iter) {
- if (ctx->GetJob(&job)) {
- gotJob = true;
- break;
- }
- }
- if (!gotJob) {
- ctx->HasJob.Reset();
- if (!ctx->GetJob(&job)) {
- ctx->HasJob.Wait();
- continue;
- }
- }
- if (job.Exec.Get()) {
- job.Exec->LocalExec(job.Id);
- RegularYield();
- } else {
- AtomicAdd(ctx->QueueSize, 1);
- ctx->JobQueue.Enqueue(job);
- ctx->HasJob.Signal();
- cont = false;
- }
- }
- AtomicAdd(ctx->ThreadCount, -1);
- return nullptr;
-}
-
-bool NPar::TLocalExecutor::TImpl::GetJob(TSingleJob* job) {
- if (JobQueue.Dequeue(job)) {
- CurrentTaskPriority = TLocalExecutor::HIGH_PRIORITY;
- AtomicAdd(QueueSize, -1);
- return true;
- } else if (MedJobQueue.Dequeue(job)) {
- CurrentTaskPriority = TLocalExecutor::MED_PRIORITY;
- AtomicAdd(MPQueueSize, -1);
- return true;
- } else if (LowJobQueue.Dequeue(job)) {
- CurrentTaskPriority = TLocalExecutor::LOW_PRIORITY;
- AtomicAdd(LPQueueSize, -1);
- return true;
- }
- return false;
-}
-
-void NPar::TLocalExecutor::TImpl::RunNewThread() {
- AtomicAdd(ThreadCount, 1);
- TThread thr(HostWorkerThread, this);
- thr.Start();
- thr.Detach();
-}
-
-void NPar::TLocalExecutor::TImpl::LaunchRange(TIntrusivePtr<TLocalRangeExecutor> rangeExec,
- int queueSizeLimit,
- TAtomic* queueSize,
- TLockFreeQueue<TSingleJob>* jobQueue) {
- int count = Min<int>(ThreadCount + 1, rangeExec->GetRangeSize());
- if (queueSizeLimit >= 0 && AtomicGet(*queueSize) >= queueSizeLimit) {
- return;
- }
- AtomicAdd(*queueSize, count);
+ ctx->WorkerThreadId = AtomicAdd(ctx->ThreadId, 1);
+ for (bool cont = true; cont;) {
+ TSingleJob job;
+ bool gotJob = false;
+ for (int iter = 0; iter < FAST_ITERATIONS; ++iter) {
+ if (ctx->GetJob(&job)) {
+ gotJob = true;
+ break;
+ }
+ }
+ if (!gotJob) {
+ ctx->HasJob.Reset();
+ if (!ctx->GetJob(&job)) {
+ ctx->HasJob.Wait();
+ continue;
+ }
+ }
+ if (job.Exec.Get()) {
+ job.Exec->LocalExec(job.Id);
+ RegularYield();
+ } else {
+ AtomicAdd(ctx->QueueSize, 1);
+ ctx->JobQueue.Enqueue(job);
+ ctx->HasJob.Signal();
+ cont = false;
+ }
+ }
+ AtomicAdd(ctx->ThreadCount, -1);
+ return nullptr;
+}
+
+bool NPar::TLocalExecutor::TImpl::GetJob(TSingleJob* job) {
+ if (JobQueue.Dequeue(job)) {
+ CurrentTaskPriority = TLocalExecutor::HIGH_PRIORITY;
+ AtomicAdd(QueueSize, -1);
+ return true;
+ } else if (MedJobQueue.Dequeue(job)) {
+ CurrentTaskPriority = TLocalExecutor::MED_PRIORITY;
+ AtomicAdd(MPQueueSize, -1);
+ return true;
+ } else if (LowJobQueue.Dequeue(job)) {
+ CurrentTaskPriority = TLocalExecutor::LOW_PRIORITY;
+ AtomicAdd(LPQueueSize, -1);
+ return true;
+ }
+ return false;
+}
+
+void NPar::TLocalExecutor::TImpl::RunNewThread() {
+ AtomicAdd(ThreadCount, 1);
+ TThread thr(HostWorkerThread, this);
+ thr.Start();
+ thr.Detach();
+}
+
+void NPar::TLocalExecutor::TImpl::LaunchRange(TIntrusivePtr<TLocalRangeExecutor> rangeExec,
+ int queueSizeLimit,
+ TAtomic* queueSize,
+ TLockFreeQueue<TSingleJob>* jobQueue) {
+ int count = Min<int>(ThreadCount + 1, rangeExec->GetRangeSize());
+ if (queueSizeLimit >= 0 && AtomicGet(*queueSize) >= queueSizeLimit) {
+ return;
+ }
+ AtomicAdd(*queueSize, count);
jobQueue->EnqueueAll(TVector<TSingleJob>{size_t(count), TSingleJob(rangeExec, 0)});
- HasJob.Signal();
-}
-
-NPar::TLocalExecutor::TLocalExecutor()
- : Impl_{MakeHolder<TImpl>()} {
-}
-
-NPar::TLocalExecutor::~TLocalExecutor() = default;
-
-void NPar::TLocalExecutor::RunAdditionalThreads(int threadCount) {
- for (int i = 0; i < threadCount; i++)
- Impl_->RunNewThread();
-}
-
-void NPar::TLocalExecutor::Exec(TIntrusivePtr<ILocallyExecutable> exec, int id, int flags) {
- Y_ASSERT((flags & WAIT_COMPLETE) == 0); // unsupported
- int prior = Max<int>(Impl_->CurrentTaskPriority, flags & PRIORITY_MASK);
- switch (prior) {
- case HIGH_PRIORITY:
- AtomicAdd(Impl_->QueueSize, 1);
- Impl_->JobQueue.Enqueue(TSingleJob(std::move(exec), id));
- break;
- case MED_PRIORITY:
- AtomicAdd(Impl_->MPQueueSize, 1);
- Impl_->MedJobQueue.Enqueue(TSingleJob(std::move(exec), id));
- break;
- case LOW_PRIORITY:
- AtomicAdd(Impl_->LPQueueSize, 1);
- Impl_->LowJobQueue.Enqueue(TSingleJob(std::move(exec), id));
- break;
- default:
- Y_ASSERT(0);
- break;
- }
- Impl_->HasJob.Signal();
-}
-
+ HasJob.Signal();
+}
+
+NPar::TLocalExecutor::TLocalExecutor()
+ : Impl_{MakeHolder<TImpl>()} {
+}
+
+NPar::TLocalExecutor::~TLocalExecutor() = default;
+
+void NPar::TLocalExecutor::RunAdditionalThreads(int threadCount) {
+ for (int i = 0; i < threadCount; i++)
+ Impl_->RunNewThread();
+}
+
+void NPar::TLocalExecutor::Exec(TIntrusivePtr<ILocallyExecutable> exec, int id, int flags) {
+ Y_ASSERT((flags & WAIT_COMPLETE) == 0); // unsupported
+ int prior = Max<int>(Impl_->CurrentTaskPriority, flags & PRIORITY_MASK);
+ switch (prior) {
+ case HIGH_PRIORITY:
+ AtomicAdd(Impl_->QueueSize, 1);
+ Impl_->JobQueue.Enqueue(TSingleJob(std::move(exec), id));
+ break;
+ case MED_PRIORITY:
+ AtomicAdd(Impl_->MPQueueSize, 1);
+ Impl_->MedJobQueue.Enqueue(TSingleJob(std::move(exec), id));
+ break;
+ case LOW_PRIORITY:
+ AtomicAdd(Impl_->LPQueueSize, 1);
+ Impl_->LowJobQueue.Enqueue(TSingleJob(std::move(exec), id));
+ break;
+ default:
+ Y_ASSERT(0);
+ break;
+ }
+ Impl_->HasJob.Signal();
+}
+
void NPar::ILocalExecutor::Exec(TLocallyExecutableFunction exec, int id, int flags) {
- Exec(new TFunctionWrapper(std::move(exec)), id, flags);
-}
-
-void NPar::TLocalExecutor::ExecRange(TIntrusivePtr<ILocallyExecutable> exec, int firstId, int lastId, int flags) {
- Y_ASSERT(lastId >= firstId);
+ Exec(new TFunctionWrapper(std::move(exec)), id, flags);
+}
+
+void NPar::TLocalExecutor::ExecRange(TIntrusivePtr<ILocallyExecutable> exec, int firstId, int lastId, int flags) {
+ Y_ASSERT(lastId >= firstId);
if (TryExecRangeSequentially([=] (int id) { exec->LocalExec(id); }, firstId, lastId, flags)) {
- return;
- }
- auto rangeExec = MakeIntrusive<TLocalRangeExecutor>(std::move(exec), firstId, lastId);
- int queueSizeLimit = (flags & WAIT_COMPLETE) ? 10000 : -1;
- int prior = Max<int>(Impl_->CurrentTaskPriority, flags & PRIORITY_MASK);
- switch (prior) {
- case HIGH_PRIORITY:
- Impl_->LaunchRange(rangeExec, queueSizeLimit, &Impl_->QueueSize, &Impl_->JobQueue);
- break;
- case MED_PRIORITY:
- Impl_->LaunchRange(rangeExec, queueSizeLimit, &Impl_->MPQueueSize, &Impl_->MedJobQueue);
- break;
- case LOW_PRIORITY:
- Impl_->LaunchRange(rangeExec, queueSizeLimit, &Impl_->LPQueueSize, &Impl_->LowJobQueue);
- break;
- default:
- Y_ASSERT(0);
- break;
- }
- if (flags & WAIT_COMPLETE) {
- int keepPrior = Impl_->CurrentTaskPriority;
- Impl_->CurrentTaskPriority = prior;
- while (rangeExec->DoSingleOp()) {
- }
- Impl_->CurrentTaskPriority = keepPrior;
- rangeExec->WaitComplete();
- }
-}
-
+ return;
+ }
+ auto rangeExec = MakeIntrusive<TLocalRangeExecutor>(std::move(exec), firstId, lastId);
+ int queueSizeLimit = (flags & WAIT_COMPLETE) ? 10000 : -1;
+ int prior = Max<int>(Impl_->CurrentTaskPriority, flags & PRIORITY_MASK);
+ switch (prior) {
+ case HIGH_PRIORITY:
+ Impl_->LaunchRange(rangeExec, queueSizeLimit, &Impl_->QueueSize, &Impl_->JobQueue);
+ break;
+ case MED_PRIORITY:
+ Impl_->LaunchRange(rangeExec, queueSizeLimit, &Impl_->MPQueueSize, &Impl_->MedJobQueue);
+ break;
+ case LOW_PRIORITY:
+ Impl_->LaunchRange(rangeExec, queueSizeLimit, &Impl_->LPQueueSize, &Impl_->LowJobQueue);
+ break;
+ default:
+ Y_ASSERT(0);
+ break;
+ }
+ if (flags & WAIT_COMPLETE) {
+ int keepPrior = Impl_->CurrentTaskPriority;
+ Impl_->CurrentTaskPriority = prior;
+ while (rangeExec->DoSingleOp()) {
+ }
+ Impl_->CurrentTaskPriority = keepPrior;
+ rangeExec->WaitComplete();
+ }
+}
+
void NPar::ILocalExecutor::ExecRange(TLocallyExecutableFunction exec, int firstId, int lastId, int flags) {
if (TryExecRangeSequentially(exec, firstId, lastId, flags)) {
return;
}
- ExecRange(new TFunctionWrapper(exec), firstId, lastId, flags);
-}
-
+ ExecRange(new TFunctionWrapper(exec), firstId, lastId, flags);
+}
+
void NPar::ILocalExecutor::ExecRangeWithThrow(TLocallyExecutableFunction exec, int firstId, int lastId, int flags) {
- Y_VERIFY((flags & WAIT_COMPLETE) != 0, "ExecRangeWithThrow() requires WAIT_COMPLETE to wait if exceptions arise.");
+ Y_VERIFY((flags & WAIT_COMPLETE) != 0, "ExecRangeWithThrow() requires WAIT_COMPLETE to wait if exceptions arise.");
if (TryExecRangeSequentially(exec, firstId, lastId, flags)) {
return;
}
- TVector<NThreading::TFuture<void>> currentRun = ExecRangeWithFutures(exec, firstId, lastId, flags);
- for (auto& result : currentRun) {
- result.GetValueSync(); // Exception will be rethrown if exists. If several exception - only the one with minimal id is rethrown.
- }
-}
-
-TVector<NThreading::TFuture<void>>
+ TVector<NThreading::TFuture<void>> currentRun = ExecRangeWithFutures(exec, firstId, lastId, flags);
+ for (auto& result : currentRun) {
+ result.GetValueSync(); // Exception will be rethrown if exists. If several exception - only the one with minimal id is rethrown.
+ }
+}
+
+TVector<NThreading::TFuture<void>>
NPar::ILocalExecutor::ExecRangeWithFutures(TLocallyExecutableFunction exec, int firstId, int lastId, int flags) {
- TFunctionWrapperWithPromise* execWrapper = new TFunctionWrapperWithPromise(exec, firstId, lastId);
- TVector<NThreading::TFuture<void>> out = execWrapper->GetFutures();
- ExecRange(execWrapper, firstId, lastId, flags);
- return out;
-}
-
-void NPar::TLocalExecutor::ClearLPQueue() {
- for (bool cont = true; cont;) {
- cont = false;
- TSingleJob job;
- while (Impl_->LowJobQueue.Dequeue(&job)) {
- AtomicAdd(Impl_->LPQueueSize, -1);
- cont = true;
- }
- while (Impl_->MedJobQueue.Dequeue(&job)) {
- AtomicAdd(Impl_->MPQueueSize, -1);
- cont = true;
- }
- }
-}
-
-int NPar::TLocalExecutor::GetQueueSize() const noexcept {
- return AtomicGet(Impl_->QueueSize);
-}
-
-int NPar::TLocalExecutor::GetMPQueueSize() const noexcept {
- return AtomicGet(Impl_->MPQueueSize);
-}
-
-int NPar::TLocalExecutor::GetLPQueueSize() const noexcept {
- return AtomicGet(Impl_->LPQueueSize);
-}
-
+ TFunctionWrapperWithPromise* execWrapper = new TFunctionWrapperWithPromise(exec, firstId, lastId);
+ TVector<NThreading::TFuture<void>> out = execWrapper->GetFutures();
+ ExecRange(execWrapper, firstId, lastId, flags);
+ return out;
+}
+
+void NPar::TLocalExecutor::ClearLPQueue() {
+ for (bool cont = true; cont;) {
+ cont = false;
+ TSingleJob job;
+ while (Impl_->LowJobQueue.Dequeue(&job)) {
+ AtomicAdd(Impl_->LPQueueSize, -1);
+ cont = true;
+ }
+ while (Impl_->MedJobQueue.Dequeue(&job)) {
+ AtomicAdd(Impl_->MPQueueSize, -1);
+ cont = true;
+ }
+ }
+}
+
+int NPar::TLocalExecutor::GetQueueSize() const noexcept {
+ return AtomicGet(Impl_->QueueSize);
+}
+
+int NPar::TLocalExecutor::GetMPQueueSize() const noexcept {
+ return AtomicGet(Impl_->MPQueueSize);
+}
+
+int NPar::TLocalExecutor::GetLPQueueSize() const noexcept {
+ return AtomicGet(Impl_->LPQueueSize);
+}
+
int NPar::TLocalExecutor::GetWorkerThreadId() const noexcept {
- return Impl_->WorkerThreadId;
-}
-
-int NPar::TLocalExecutor::GetThreadCount() const noexcept {
- return AtomicGet(Impl_->ThreadCount);
-}
-
-//////////////////////////////////////////////////////////////////////////
+ return Impl_->WorkerThreadId;
+}
+
+int NPar::TLocalExecutor::GetThreadCount() const noexcept {
+ return AtomicGet(Impl_->ThreadCount);
+}
+
+//////////////////////////////////////////////////////////////////////////
diff --git a/library/cpp/threading/local_executor/local_executor.h b/library/cpp/threading/local_executor/local_executor.h
index c1c824f67c..aa500d34d3 100644
--- a/library/cpp/threading/local_executor/local_executor.h
+++ b/library/cpp/threading/local_executor/local_executor.h
@@ -1,23 +1,23 @@
#pragma once
#include <library/cpp/threading/future/future.h>
-
+
#include <util/generic/cast.h>
-#include <util/generic/fwd.h>
-#include <util/generic/noncopyable.h>
+#include <util/generic/fwd.h>
+#include <util/generic/noncopyable.h>
#include <util/generic/ptr.h>
-#include <util/generic/singleton.h>
+#include <util/generic/singleton.h>
#include <util/generic/ymath.h>
-
+
#include <functional>
namespace NPar {
struct ILocallyExecutable : virtual public TThrRefBase {
- // Must be implemented by the end user to define job that will be processed by one of
- // executor threads.
- //
- // @param id Job parameter, typically an index pointing somewhere in array, or just
- // some dummy value, e.g. `0`.
+ // Must be implemented by the end user to define job that will be processed by one of
+ // executor threads.
+ //
+ // @param id Job parameter, typically an index pointing somewhere in array, or just
+ // some dummy value, e.g. `0`.
virtual void LocalExec(int id) = 0;
};
@@ -31,7 +31,7 @@ namespace NPar {
ILocalExecutor() = default;
virtual ~ILocalExecutor() = default;
- enum EFlags : int {
+ enum EFlags : int {
HIGH_PRIORITY = 0,
MED_PRIORITY = 1,
LOW_PRIORITY = 2,
@@ -58,8 +58,8 @@ namespace NPar {
virtual int GetWorkerThreadId() const noexcept = 0;
virtual int GetThreadCount() const noexcept = 0;
- // Describes a range of tasks with parameters from integer range [FirstId, LastId).
- //
+ // Describes a range of tasks with parameters from integer range [FirstId, LastId).
+ //
class TExecRangeParams {
public:
template <typename TFirst, typename TLast>
@@ -70,9 +70,9 @@ namespace NPar {
Y_ASSERT(LastId >= FirstId);
SetBlockSize(1);
}
- // Partition tasks into `blockCount` blocks of approximately equal size, each of which
- // will be executed as a separate bigger task.
- //
+ // Partition tasks into `blockCount` blocks of approximately equal size, each of which
+ // will be executed as a separate bigger task.
+ //
template <typename TBlockCount>
TExecRangeParams& SetBlockCount(TBlockCount blockCount) {
Y_ASSERT(SafeIntegerCast<int>(blockCount) > 0 || FirstId == LastId);
@@ -81,9 +81,9 @@ namespace NPar {
BlockEqualToThreads = false;
return *this;
}
- // Partition tasks into blocks of approximately `blockSize` size, each of which will
- // be executed as a separate bigger task.
- //
+ // Partition tasks into blocks of approximately `blockSize` size, each of which will
+ // be executed as a separate bigger task.
+ //
template <typename TBlockSize>
TExecRangeParams& SetBlockSize(TBlockSize blockSize) {
Y_ASSERT(SafeIntegerCast<int>(blockSize) > 0 || FirstId == LastId);
@@ -92,9 +92,9 @@ namespace NPar {
BlockEqualToThreads = false;
return *this;
}
- // Partition tasks into thread count blocks of approximately equal size, each of which
- // will be executed as a separate bigger task.
- //
+ // Partition tasks into thread count blocks of approximately equal size, each of which
+ // will be executed as a separate bigger task.
+ //
TExecRangeParams& SetBlockCountToThreadCount() {
BlockEqualToThreads = true;
return *this;
@@ -107,9 +107,9 @@ namespace NPar {
Y_ASSERT(!BlockEqualToThreads);
return BlockSize;
}
- bool GetBlockEqualToThreads() {
- return BlockEqualToThreads;
- }
+ bool GetBlockEqualToThreads() {
+ return BlockEqualToThreads;
+ }
const int FirstId = 0;
const int LastId = 0;
@@ -120,26 +120,26 @@ namespace NPar {
bool BlockEqualToThreads;
};
- // `Exec` and `ExecRange` versions that accept functions.
- //
- void Exec(TLocallyExecutableFunction exec, int id, int flags);
- void ExecRange(TLocallyExecutableFunction exec, int firstId, int lastId, int flags);
-
- // Version of `ExecRange` that throws exception from task with minimal id if at least one of
- // task threw an exception.
- //
- void ExecRangeWithThrow(TLocallyExecutableFunction exec, int firstId, int lastId, int flags);
-
- // Version of `ExecRange` that returns vector of futures, thus allowing to retry any task if
- // it fails.
- //
- TVector<NThreading::TFuture<void>> ExecRangeWithFutures(TLocallyExecutableFunction exec, int firstId, int lastId, int flags);
-
+ // `Exec` and `ExecRange` versions that accept functions.
+ //
+ void Exec(TLocallyExecutableFunction exec, int id, int flags);
+ void ExecRange(TLocallyExecutableFunction exec, int firstId, int lastId, int flags);
+
+ // Version of `ExecRange` that throws exception from task with minimal id if at least one of
+ // task threw an exception.
+ //
+ void ExecRangeWithThrow(TLocallyExecutableFunction exec, int firstId, int lastId, int flags);
+
+ // Version of `ExecRange` that returns vector of futures, thus allowing to retry any task if
+ // it fails.
+ //
+ TVector<NThreading::TFuture<void>> ExecRangeWithFutures(TLocallyExecutableFunction exec, int firstId, int lastId, int flags);
+
template <typename TBody>
static inline auto BlockedLoopBody(const TExecRangeParams& params, const TBody& body) {
return [=](int blockId) {
- const int blockFirstId = params.FirstId + blockId * params.GetBlockSize();
- const int blockLastId = Min(params.LastId, blockFirstId + params.GetBlockSize());
+ const int blockFirstId = params.FirstId + blockId * params.GetBlockSize();
+ const int blockLastId = Min(params.LastId, blockFirstId + params.GetBlockSize());
for (int i = blockFirstId; i < blockLastId; ++i) {
body(i);
}
@@ -151,10 +151,10 @@ namespace NPar {
if (TryExecRangeSequentially(body, params.FirstId, params.LastId, flags)) {
return;
}
- if (params.GetBlockEqualToThreads()) {
- params.SetBlockCount(GetThreadCount() + ((flags & WAIT_COMPLETE) != 0)); // ThreadCount or ThreadCount+1 depending on WaitFlag
+ if (params.GetBlockEqualToThreads()) {
+ params.SetBlockCount(GetThreadCount() + ((flags & WAIT_COMPLETE) != 0)); // ThreadCount or ThreadCount+1 depending on WaitFlag
}
- ExecRange(BlockedLoopBody(params, body), 0, params.GetBlockCount(), flags);
+ ExecRange(BlockedLoopBody(params, body), 0, params.GetBlockCount(), flags);
}
template <typename TBody>
@@ -269,7 +269,7 @@ namespace NPar {
THolder<TImpl> Impl_;
};
- static inline TLocalExecutor& LocalExecutor() {
+ static inline TLocalExecutor& LocalExecutor() {
return *Singleton<TLocalExecutor>();
}
diff --git a/library/cpp/threading/local_executor/ut/local_executor_ut.cpp b/library/cpp/threading/local_executor/ut/local_executor_ut.cpp
index ac5737717c..fe7dab0899 100644
--- a/library/cpp/threading/local_executor/ut/local_executor_ut.cpp
+++ b/library/cpp/threading/local_executor/ut/local_executor_ut.cpp
@@ -1,10 +1,10 @@
#include <library/cpp/threading/local_executor/local_executor.h>
#include <library/cpp/threading/future/future.h>
-
+
#include <library/cpp/testing/unittest/registar.h>
#include <util/system/mutex.h>
#include <util/system/rwlock.h>
-#include <util/generic/algorithm.h>
+#include <util/generic/algorithm.h>
using namespace NPar;
@@ -14,7 +14,7 @@ class TTestException: public yexception {
static const int DefaultThreadsCount = 41;
static const int DefaultRangeSize = 999;
-Y_UNIT_TEST_SUITE(ExecRangeWithFutures){
+Y_UNIT_TEST_SUITE(ExecRangeWithFutures){
bool AllOf(const TVector<int>& vec, int value){
return AllOf(vec, [value](int element) { return value == element; });
}
@@ -41,23 +41,23 @@ void AsyncRunAndWaitFuturesReady(int rangeSize, int threads) {
UNIT_ASSERT(AllOf(data, 1));
}
-Y_UNIT_TEST(AsyncRunRangeAndWaitFuturesReady) {
+Y_UNIT_TEST(AsyncRunRangeAndWaitFuturesReady) {
AsyncRunAndWaitFuturesReady(DefaultRangeSize, DefaultThreadsCount);
}
-Y_UNIT_TEST(AsyncRunOneTaskAndWaitFuturesReady) {
+Y_UNIT_TEST(AsyncRunOneTaskAndWaitFuturesReady) {
AsyncRunAndWaitFuturesReady(1, DefaultThreadsCount);
}
-Y_UNIT_TEST(AsyncRunRangeAndWaitFuturesReadyOneExtraThread) {
+Y_UNIT_TEST(AsyncRunRangeAndWaitFuturesReadyOneExtraThread) {
AsyncRunAndWaitFuturesReady(DefaultRangeSize, 1);
}
-Y_UNIT_TEST(AsyncRunOneThreadAndWaitFuturesReadyOneExtraThread) {
+Y_UNIT_TEST(AsyncRunOneThreadAndWaitFuturesReadyOneExtraThread) {
AsyncRunAndWaitFuturesReady(1, 1);
}
-Y_UNIT_TEST(AsyncRunTwoRangesAndWaitFuturesReady) {
+Y_UNIT_TEST(AsyncRunTwoRangesAndWaitFuturesReady) {
TLocalExecutor localExecutor;
localExecutor.RunAdditionalThreads(DefaultThreadsCount);
TAtomic signal = 0;
@@ -118,23 +118,23 @@ void AsyncRunRangeAndWaitExceptions(int rangeSize, int threadsCount) {
UNIT_ASSERT(AllOf(data, 1));
}
-Y_UNIT_TEST(AsyncRunRangeAndWaitExceptions) {
+Y_UNIT_TEST(AsyncRunRangeAndWaitExceptions) {
AsyncRunRangeAndWaitExceptions(DefaultRangeSize, DefaultThreadsCount);
}
-Y_UNIT_TEST(AsyncRunOneTaskAndWaitExceptions) {
+Y_UNIT_TEST(AsyncRunOneTaskAndWaitExceptions) {
AsyncRunRangeAndWaitExceptions(1, DefaultThreadsCount);
}
-Y_UNIT_TEST(AsyncRunRangeAndWaitExceptionsOneExtraThread) {
+Y_UNIT_TEST(AsyncRunRangeAndWaitExceptionsOneExtraThread) {
AsyncRunRangeAndWaitExceptions(DefaultRangeSize, 1);
}
-Y_UNIT_TEST(AsyncRunOneTaskAndWaitExceptionsOneExtraThread) {
+Y_UNIT_TEST(AsyncRunOneTaskAndWaitExceptionsOneExtraThread) {
AsyncRunRangeAndWaitExceptions(1, 1);
}
-Y_UNIT_TEST(AsyncRunTwoRangesAndWaitExceptions) {
+Y_UNIT_TEST(AsyncRunTwoRangesAndWaitExceptions) {
TLocalExecutor localExecutor;
localExecutor.RunAdditionalThreads(DefaultThreadsCount);
TAtomic signal = 0;
@@ -209,33 +209,33 @@ void RunRangeAndCheckExceptionsWithWaitComplete(int rangeSize, int threadsCount)
UNIT_ASSERT(AllOf(data, 1));
}
-Y_UNIT_TEST(RunRangeAndCheckExceptionsWithWaitComplete) {
+Y_UNIT_TEST(RunRangeAndCheckExceptionsWithWaitComplete) {
RunRangeAndCheckExceptionsWithWaitComplete(DefaultRangeSize, DefaultThreadsCount);
}
-Y_UNIT_TEST(RunOneAndCheckExceptionsWithWaitComplete) {
+Y_UNIT_TEST(RunOneAndCheckExceptionsWithWaitComplete) {
RunRangeAndCheckExceptionsWithWaitComplete(1, DefaultThreadsCount);
}
-Y_UNIT_TEST(RunRangeAndCheckExceptionsWithWaitCompleteOneExtraThread) {
+Y_UNIT_TEST(RunRangeAndCheckExceptionsWithWaitCompleteOneExtraThread) {
RunRangeAndCheckExceptionsWithWaitComplete(DefaultRangeSize, 1);
}
-Y_UNIT_TEST(RunOneAndCheckExceptionsWithWaitCompleteOneExtraThread) {
+Y_UNIT_TEST(RunOneAndCheckExceptionsWithWaitCompleteOneExtraThread) {
RunRangeAndCheckExceptionsWithWaitComplete(1, 1);
}
-Y_UNIT_TEST(RunRangeAndCheckExceptionsWithWaitCompleteZeroExtraThreads) {
+Y_UNIT_TEST(RunRangeAndCheckExceptionsWithWaitCompleteZeroExtraThreads) {
RunRangeAndCheckExceptionsWithWaitComplete(DefaultRangeSize, 0);
}
-Y_UNIT_TEST(RunOneAndCheckExceptionsWithWaitCompleteZeroExtraThreads) {
+Y_UNIT_TEST(RunOneAndCheckExceptionsWithWaitCompleteZeroExtraThreads) {
RunRangeAndCheckExceptionsWithWaitComplete(1, 0);
}
}
;
-Y_UNIT_TEST_SUITE(ExecRangeWithThrow){
+Y_UNIT_TEST_SUITE(ExecRangeWithThrow){
void RunParallelWhichThrowsTTestException(int rangeStart, int rangeSize, int threadsCount, int flags, TAtomic& processed){
AtomicSet(processed, 0);
TLocalExecutor localExecutor;
@@ -247,7 +247,7 @@ localExecutor.ExecRangeWithThrow([&processed](int) {
rangeStart, rangeStart + rangeSize, flags);
}
-Y_UNIT_TEST(RunParallelWhichThrowsTTestException) {
+Y_UNIT_TEST(RunParallelWhichThrowsTTestException) {
TAtomic processed = 0;
UNIT_ASSERT_EXCEPTION(
RunParallelWhichThrowsTTestException(10, 40, DefaultThreadsCount,
@@ -264,32 +264,32 @@ void ThrowAndCatchTTestException(int rangeSize, int threadsCount, int flags) {
UNIT_ASSERT(AtomicGet(processed) == rangeSize);
}
-Y_UNIT_TEST(ThrowAndCatchTTestExceptionLowPriority) {
+Y_UNIT_TEST(ThrowAndCatchTTestExceptionLowPriority) {
ThrowAndCatchTTestException(DefaultRangeSize, DefaultThreadsCount,
TLocalExecutor::EFlags::WAIT_COMPLETE | TLocalExecutor::EFlags::LOW_PRIORITY);
}
-Y_UNIT_TEST(ThrowAndCatchTTestExceptionMedPriority) {
+Y_UNIT_TEST(ThrowAndCatchTTestExceptionMedPriority) {
ThrowAndCatchTTestException(DefaultRangeSize, DefaultThreadsCount,
TLocalExecutor::EFlags::WAIT_COMPLETE | TLocalExecutor::EFlags::MED_PRIORITY);
}
-Y_UNIT_TEST(ThrowAndCatchTTestExceptionHighPriority) {
+Y_UNIT_TEST(ThrowAndCatchTTestExceptionHighPriority) {
ThrowAndCatchTTestException(DefaultRangeSize, DefaultThreadsCount,
TLocalExecutor::EFlags::WAIT_COMPLETE | TLocalExecutor::EFlags::HIGH_PRIORITY);
}
-Y_UNIT_TEST(ThrowAndCatchTTestExceptionWaitComplete) {
+Y_UNIT_TEST(ThrowAndCatchTTestExceptionWaitComplete) {
ThrowAndCatchTTestException(DefaultRangeSize, DefaultThreadsCount,
TLocalExecutor::EFlags::WAIT_COMPLETE);
}
-Y_UNIT_TEST(RethrowExeptionSequentialWaitComplete) {
+Y_UNIT_TEST(RethrowExeptionSequentialWaitComplete) {
ThrowAndCatchTTestException(DefaultRangeSize, 0,
TLocalExecutor::EFlags::WAIT_COMPLETE);
}
-Y_UNIT_TEST(RethrowExeptionOneExtraThreadWaitComplete) {
+Y_UNIT_TEST(RethrowExeptionOneExtraThreadWaitComplete) {
ThrowAndCatchTTestException(DefaultRangeSize, 1,
TLocalExecutor::EFlags::WAIT_COMPLETE);
}
@@ -314,7 +314,7 @@ void CatchTTestExceptionFromNested(TAtomic& processed1, TAtomic& processed2) {
0, DefaultRangeSize, TLocalExecutor::EFlags::WAIT_COMPLETE);
}
-Y_UNIT_TEST(NestedParallelExceptionsDoNotLeak) {
+Y_UNIT_TEST(NestedParallelExceptionsDoNotLeak) {
TAtomic processed1 = 0;
TAtomic processed2 = 0;
UNIT_ASSERT_NO_EXCEPTION(
diff --git a/library/cpp/threading/local_executor/ut/ya.make b/library/cpp/threading/local_executor/ut/ya.make
index be579a5ca0..2983c4f466 100644
--- a/library/cpp/threading/local_executor/ut/ya.make
+++ b/library/cpp/threading/local_executor/ut/ya.make
@@ -1,10 +1,10 @@
OWNER(
g:matrixnet
- gulin
-)
+ gulin
+)
UNITTEST_FOR(library/cpp/threading/local_executor)
-
+
SRCS(
local_executor_ut.cpp
)
diff --git a/library/cpp/threading/local_executor/ya.make b/library/cpp/threading/local_executor/ya.make
index df210f92bb..516be66703 100644
--- a/library/cpp/threading/local_executor/ya.make
+++ b/library/cpp/threading/local_executor/ya.make
@@ -5,8 +5,8 @@ OWNER(
espetrov
)
-LIBRARY()
-
+LIBRARY()
+
SRCS(
local_executor.cpp
tbb_local_executor.cpp
diff --git a/library/cpp/threading/poor_man_openmp/thread_helper_ut.cpp b/library/cpp/threading/poor_man_openmp/thread_helper_ut.cpp
index 7417636864..0f91c1ce4a 100644
--- a/library/cpp/threading/poor_man_openmp/thread_helper_ut.cpp
+++ b/library/cpp/threading/poor_man_openmp/thread_helper_ut.cpp
@@ -5,8 +5,8 @@
#include <util/generic/string.h>
#include <util/generic/yexception.h>
-Y_UNIT_TEST_SUITE(TestMP) {
- Y_UNIT_TEST(TestErr) {
+Y_UNIT_TEST_SUITE(TestMP) {
+ Y_UNIT_TEST(TestErr) {
std::function<void(int)> f = [](int x) {
if (x == 5) {
ythrow yexception() << "oops";
diff --git a/library/cpp/threading/queue/basic_ut.cpp b/library/cpp/threading/queue/basic_ut.cpp
index 5f56f8583e..a52b46c8a6 100644
--- a/library/cpp/threading/queue/basic_ut.cpp
+++ b/library/cpp/threading/queue/basic_ut.cpp
@@ -51,7 +51,7 @@ public:
template <size_t NUMBER_OF_THREADS>
void RepeatPush1Pop1_InManyThreads() {
- class TCycleThread: public ISimpleThread {
+ class TCycleThread: public ISimpleThread {
public:
void* ThreadProc() override {
TQueueType queue;
diff --git a/library/cpp/threading/queue/queue_ut.cpp b/library/cpp/threading/queue/queue_ut.cpp
index 80eca147da..eb77e51e19 100644
--- a/library/cpp/threading/queue/queue_ut.cpp
+++ b/library/cpp/threading/queue/queue_ut.cpp
@@ -43,7 +43,7 @@ public:
void Threads2_Push1M_Threads1_Pop2M() {
TQueueType queue;
- class TPusherThread: public ISimpleThread {
+ class TPusherThread: public ISimpleThread {
public:
TPusherThread(TQueueType& theQueue, char* start)
: Queue(theQueue)
@@ -81,7 +81,7 @@ public:
void Threads4_Push1M_Threads1_Pop4M() {
TQueueType queue;
- class TPusherThread: public ISimpleThread {
+ class TPusherThread: public ISimpleThread {
public:
TPusherThread(TQueueType& theQueue, char* start)
: Queue(theQueue)
@@ -124,7 +124,7 @@ public:
void ManyRndPush100K_ManyQueues() {
TQueueType queue[NUMBER_OF_QUEUES];
- class TPusherThread: public ISimpleThread {
+ class TPusherThread: public ISimpleThread {
public:
TPusherThread(TQueueType* queues, char* start)
: Queues(queues)
@@ -155,7 +155,7 @@ public:
}
};
- class TPopperThread: public ISimpleThread {
+ class TPopperThread: public ISimpleThread {
public:
TPopperThread(TQueueType* theQueue, char* base)
: Queue(theQueue)
diff --git a/library/cpp/threading/queue/tune_ut.cpp b/library/cpp/threading/queue/tune_ut.cpp
index 7e980d3e27..34086ccf0f 100644
--- a/library/cpp/threading/queue/tune_ut.cpp
+++ b/library/cpp/threading/queue/tune_ut.cpp
@@ -19,8 +19,8 @@ DeclareTuneTypeParam(TweakStructB, TStructB);
DeclareTuneValueParam(TweakParam1, ui32, Param1);
DeclareTuneValueParam(TweakParam2, ui32, Param2);
-Y_UNIT_TEST_SUITE(TestTuning) {
- Y_UNIT_TEST(Defaults) {
+Y_UNIT_TEST_SUITE(TestTuning) {
+ Y_UNIT_TEST(Defaults) {
using TTuned = TTune<TDefaults>;
using TunedA = TTuned::TStructA;
using TunedB = TTuned::TStructB;
@@ -35,7 +35,7 @@ Y_UNIT_TEST_SUITE(TestTuning) {
UNIT_ASSERT_EQUAL(param2, 42);
}
- Y_UNIT_TEST(TuneStructA) {
+ Y_UNIT_TEST(TuneStructA) {
struct TMyStruct {
};
@@ -56,7 +56,7 @@ Y_UNIT_TEST_SUITE(TestTuning) {
UNIT_ASSERT_EQUAL(param2, 42);
}
- Y_UNIT_TEST(TuneParam1) {
+ Y_UNIT_TEST(TuneParam1) {
using TTuned = TTune<TDefaults, TweakParam1<24>>;
using TunedA = TTuned::TStructA;
@@ -72,7 +72,7 @@ Y_UNIT_TEST_SUITE(TestTuning) {
UNIT_ASSERT_EQUAL(param2, 42);
}
- Y_UNIT_TEST(TuneStructAAndParam1) {
+ Y_UNIT_TEST(TuneStructAAndParam1) {
struct TMyStruct {
};
@@ -94,7 +94,7 @@ Y_UNIT_TEST_SUITE(TestTuning) {
UNIT_ASSERT_EQUAL(param2, 42);
}
- Y_UNIT_TEST(TuneParam1AndStructA) {
+ Y_UNIT_TEST(TuneParam1AndStructA) {
struct TMyStruct {
};
diff --git a/library/cpp/threading/queue/unordered_ut.cpp b/library/cpp/threading/queue/unordered_ut.cpp
index a43b7f520e..1310559c46 100644
--- a/library/cpp/threading/queue/unordered_ut.cpp
+++ b/library/cpp/threading/queue/unordered_ut.cpp
@@ -56,7 +56,7 @@ public:
void ManyThreadsRndExchange() {
TQueueType queues[COUNT];
- class TWorker: public ISimpleThread {
+ class TWorker: public ISimpleThread {
public:
TWorker(
TQueueType* queues_,
diff --git a/library/cpp/threading/skip_list/skiplist_ut.cpp b/library/cpp/threading/skip_list/skiplist_ut.cpp
index 52fcffda66..9c483de136 100644
--- a/library/cpp/threading/skip_list/skiplist_ut.cpp
+++ b/library/cpp/threading/skip_list/skiplist_ut.cpp
@@ -35,15 +35,15 @@ namespace NThreading {
////////////////////////////////////////////////////////////////////////////////
- Y_UNIT_TEST_SUITE(TSkipListTest) {
- Y_UNIT_TEST(ShouldBeEmptyAfterCreation) {
+ Y_UNIT_TEST_SUITE(TSkipListTest) {
+ Y_UNIT_TEST(ShouldBeEmptyAfterCreation) {
TMemoryPool pool(1024);
TSkipList<int> list(pool);
UNIT_ASSERT_EQUAL(list.GetSize(), 0);
}
- Y_UNIT_TEST(ShouldAllowInsertion) {
+ Y_UNIT_TEST(ShouldAllowInsertion) {
TMemoryPool pool(1024);
TSkipList<int> list(pool);
@@ -51,7 +51,7 @@ namespace NThreading {
UNIT_ASSERT_EQUAL(list.GetSize(), 1);
}
- Y_UNIT_TEST(ShouldNotAllowDuplicates) {
+ Y_UNIT_TEST(ShouldNotAllowDuplicates) {
TMemoryPool pool(1024);
TSkipList<int> list(pool);
@@ -62,7 +62,7 @@ namespace NThreading {
UNIT_ASSERT_EQUAL(list.GetSize(), 1);
}
- Y_UNIT_TEST(ShouldContainInsertedItem) {
+ Y_UNIT_TEST(ShouldContainInsertedItem) {
TMemoryPool pool(1024);
TSkipList<int> list(pool);
@@ -70,7 +70,7 @@ namespace NThreading {
UNIT_ASSERT(list.Contains(12345678));
}
- Y_UNIT_TEST(ShouldNotContainNotInsertedItem) {
+ Y_UNIT_TEST(ShouldNotContainNotInsertedItem) {
TMemoryPool pool(1024);
TSkipList<int> list(pool);
@@ -78,7 +78,7 @@ namespace NThreading {
UNIT_ASSERT(!list.Contains(87654321));
}
- Y_UNIT_TEST(ShouldIterateAllItems) {
+ Y_UNIT_TEST(ShouldIterateAllItems) {
TMemoryPool pool(1024);
TSkipList<int> list(pool);
@@ -95,7 +95,7 @@ namespace NThreading {
UNIT_ASSERT(!it.IsValid());
}
- Y_UNIT_TEST(ShouldIterateAllItemsInReverseDirection) {
+ Y_UNIT_TEST(ShouldIterateAllItemsInReverseDirection) {
TMemoryPool pool(1024);
TSkipList<int> list(pool);
@@ -112,7 +112,7 @@ namespace NThreading {
UNIT_ASSERT(!it.IsValid());
}
- Y_UNIT_TEST(ShouldSeekToFirstItem) {
+ Y_UNIT_TEST(ShouldSeekToFirstItem) {
TMemoryPool pool(1024);
TSkipList<int> list(pool);
@@ -125,7 +125,7 @@ namespace NThreading {
UNIT_ASSERT_EQUAL(it.GetValue(), 1);
}
- Y_UNIT_TEST(ShouldSeekToLastItem) {
+ Y_UNIT_TEST(ShouldSeekToLastItem) {
TMemoryPool pool(1024);
TSkipList<int> list(pool);
@@ -138,7 +138,7 @@ namespace NThreading {
UNIT_ASSERT_EQUAL(it.GetValue(), 9);
}
- Y_UNIT_TEST(ShouldSeekToExistingItem) {
+ Y_UNIT_TEST(ShouldSeekToExistingItem) {
TMemoryPool pool(1024);
TSkipList<int> list(pool);
@@ -148,7 +148,7 @@ namespace NThreading {
UNIT_ASSERT(it.IsValid());
}
- Y_UNIT_TEST(ShouldSeekAfterMissedItem) {
+ Y_UNIT_TEST(ShouldSeekAfterMissedItem) {
TMemoryPool pool(1024);
TSkipList<int> list(pool);
@@ -164,7 +164,7 @@ namespace NThreading {
UNIT_ASSERT_EQUAL(it.GetValue(), 100);
}
- Y_UNIT_TEST(ShouldCallDtorsOfNonPodTypes) {
+ Y_UNIT_TEST(ShouldCallDtorsOfNonPodTypes) {
UNIT_ASSERT(!TTypeTraits<TTestObject>::IsPod);
UNIT_ASSERT_EQUAL(TTestObject::Count, 0);
diff --git a/library/cpp/threading/task_scheduler/task_scheduler.cpp b/library/cpp/threading/task_scheduler/task_scheduler.cpp
index 174dde4bf7..95bd27d7cf 100644
--- a/library/cpp/threading/task_scheduler/task_scheduler.cpp
+++ b/library/cpp/threading/task_scheduler/task_scheduler.cpp
@@ -2,7 +2,7 @@
#include <util/system/thread.h>
#include <util/string/cast.h>
-#include <util/stream/output.h>
+#include <util/stream/output.h>
TTaskScheduler::ITask::~ITask() {}
TTaskScheduler::IRepeatedTask::~IRepeatedTask() {}
@@ -10,7 +10,7 @@ TTaskScheduler::IRepeatedTask::~IRepeatedTask() {}
class TTaskScheduler::TWorkerThread
- : public ISimpleThread
+ : public ISimpleThread
{
public:
TWorkerThread(TTaskScheduler& state)
@@ -152,8 +152,8 @@ const bool debugOutput = false;
void TTaskScheduler::ChangeDebugState(TWorkerThread* thread, const TString& state) {
if (!debugOutput) {
- Y_UNUSED(thread);
- Y_UNUSED(state);
+ Y_UNUSED(thread);
+ Y_UNUSED(state);
return;
}
diff --git a/library/cpp/threading/task_scheduler/task_scheduler_ut.cpp b/library/cpp/threading/task_scheduler/task_scheduler_ut.cpp
index 3b5203194a..8f21984b77 100644
--- a/library/cpp/threading/task_scheduler/task_scheduler_ut.cpp
+++ b/library/cpp/threading/task_scheduler/task_scheduler_ut.cpp
@@ -1,7 +1,7 @@
#include <algorithm>
#include <library/cpp/testing/unittest/registar.h>
-#include <util/stream/output.h>
+#include <util/stream/output.h>
#include <util/system/atomic.h>
#include <util/generic/vector.h>