aboutsummaryrefslogtreecommitdiffstats
path: root/util/thread
diff options
context:
space:
mode:
authorAnton Samokhvalov <pg83@yandex.ru>2022-02-10 16:45:15 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:45:15 +0300
commit72cb13b4aff9bc9cf22e49251bc8fd143f82538f (patch)
treeda2c34829458c7d4e74bdfbdf85dff449e9e7fb8 /util/thread
parent778e51ba091dc39e7b7fcab2b9cf4dbedfb6f2b5 (diff)
downloadydb-72cb13b4aff9bc9cf22e49251bc8fd143f82538f.tar.gz
Restoring authorship annotation for Anton Samokhvalov <pg83@yandex.ru>. Commit 1 of 2.
Diffstat (limited to 'util/thread')
-rw-r--r--util/thread/factory.cpp128
-rw-r--r--util/thread/factory.h92
-rw-r--r--util/thread/factory_ut.cpp96
-rw-r--r--util/thread/lfqueue.cpp2
-rw-r--r--util/thread/lfqueue.h98
-rw-r--r--util/thread/lfqueue_ut.cpp18
-rw-r--r--util/thread/lfstack.cpp2
-rw-r--r--util/thread/lfstack.h66
-rw-r--r--util/thread/lfstack_ut.cpp28
-rw-r--r--util/thread/pool.cpp990
-rw-r--r--util/thread/pool.h260
-rw-r--r--util/thread/pool_ut.cpp88
-rw-r--r--util/thread/singleton.cpp2
-rw-r--r--util/thread/singleton.h48
-rw-r--r--util/thread/singleton_ut.cpp4
-rw-r--r--util/thread/ut/ya.make8
16 files changed, 965 insertions, 965 deletions
diff --git a/util/thread/factory.cpp b/util/thread/factory.cpp
index 48e898f32d..0de161f007 100644
--- a/util/thread/factory.cpp
+++ b/util/thread/factory.cpp
@@ -1,71 +1,71 @@
#include "factory.h"
-
-#include <util/system/thread.h>
-#include <util/generic/singleton.h>
-
+
+#include <util/system/thread.h>
+#include <util/generic/singleton.h>
+
using IThread = IThreadFactory::IThread;
-
-namespace {
+
+namespace {
class TSystemThreadFactory: public IThreadFactory {
- public:
- class TPoolThread: public IThread {
- public:
+ public:
+ class TPoolThread: public IThread {
+ public:
~TPoolThread() override {
- if (Thr_) {
- Thr_->Detach();
- }
- }
-
+ if (Thr_) {
+ Thr_->Detach();
+ }
+ }
+
void DoRun(IThreadAble* func) override {
- Thr_.Reset(new TThread(ThreadProc, func));
-
- Thr_->Start();
- }
-
+ Thr_.Reset(new TThread(ThreadProc, func));
+
+ Thr_->Start();
+ }
+
void DoJoin() noexcept override {
- if (!Thr_) {
- return;
- }
-
- Thr_->Join();
- Thr_.Destroy();
- }
-
- private:
- static void* ThreadProc(void* func) {
- ((IThreadAble*)(func))->Execute();
-
+ if (!Thr_) {
+ return;
+ }
+
+ Thr_->Join();
+ Thr_.Destroy();
+ }
+
+ private:
+ static void* ThreadProc(void* func) {
+ ((IThreadAble*)(func))->Execute();
+
return nullptr;
- }
-
- private:
- THolder<TThread> Thr_;
- };
-
+ }
+
+ private:
+ THolder<TThread> Thr_;
+ };
+
inline TSystemThreadFactory() noexcept {
- }
-
+ }
+
IThread* DoCreate() override {
- return new TPoolThread;
- }
- };
+ return new TPoolThread;
+ }
+ };
class TThreadFactoryFuncObj: public IThreadFactory::IThreadAble {
- public:
+ public:
TThreadFactoryFuncObj(const std::function<void()>& func)
- : Func(func)
- {
- }
+ : Func(func)
+ {
+ }
void DoExecute() override {
THolder<TThreadFactoryFuncObj> self(this);
- Func();
- }
-
- private:
+ Func();
+ }
+
+ private:
std::function<void()> Func;
};
-}
-
+}
+
THolder<IThread> IThreadFactory::Run(std::function<void()> func) {
THolder<IThread> ret(DoCreate());
@@ -76,18 +76,18 @@ THolder<IThread> IThreadFactory::Run(std::function<void()> func) {
static IThreadFactory* SystemThreadPoolImpl() {
return Singleton<TSystemThreadFactory>();
-}
-
+}
+
static IThreadFactory* systemPool = nullptr;
-
+
IThreadFactory* SystemThreadFactory() {
- if (systemPool) {
- return systemPool;
- }
-
- return SystemThreadPoolImpl();
-}
-
+ if (systemPool) {
+ return systemPool;
+ }
+
+ return SystemThreadPoolImpl();
+}
+
void SetSystemThreadFactory(IThreadFactory* pool) {
- systemPool = pool;
-}
+ systemPool = pool;
+}
diff --git a/util/thread/factory.h b/util/thread/factory.h
index 561fcbac88..9393f0f309 100644
--- a/util/thread/factory.h
+++ b/util/thread/factory.h
@@ -1,65 +1,65 @@
#pragma once
-
+
#include <util/generic/ptr.h>
#include <functional>
-
+
class IThreadFactory {
-public:
- class IThreadAble {
- public:
+public:
+ class IThreadAble {
+ public:
inline IThreadAble() noexcept = default;
-
+
virtual ~IThreadAble() = default;
-
- inline void Execute() {
- DoExecute();
- }
-
- private:
- virtual void DoExecute() = 0;
- };
-
- class IThread {
+
+ inline void Execute() {
+ DoExecute();
+ }
+
+ private:
+ virtual void DoExecute() = 0;
+ };
+
+ class IThread {
friend class IThreadFactory;
-
- public:
+
+ public:
inline IThread() noexcept = default;
-
+
virtual ~IThread() = default;
-
+
inline void Join() noexcept {
- DoJoin();
- }
-
- private:
- inline void Run(IThreadAble* func) {
- DoRun(func);
- }
-
- private:
- // it's actually DoStart
- virtual void DoRun(IThreadAble* func) = 0;
+ DoJoin();
+ }
+
+ private:
+ inline void Run(IThreadAble* func) {
+ DoRun(func);
+ }
+
+ private:
+ // it's actually DoStart
+ virtual void DoRun(IThreadAble* func) = 0;
virtual void DoJoin() noexcept = 0;
- };
-
+ };
+
inline IThreadFactory() noexcept = default;
-
+
virtual ~IThreadFactory() = default;
-
- // XXX: rename to Start
+
+ // XXX: rename to Start
inline THolder<IThread> Run(IThreadAble* func) {
THolder<IThread> ret(DoCreate());
- ret->Run(func);
-
- return ret;
- }
-
+ ret->Run(func);
+
+ return ret;
+ }
+
THolder<IThread> Run(std::function<void()> func);
-
-private:
- virtual IThread* DoCreate() = 0;
-};
-
+
+private:
+ virtual IThread* DoCreate() = 0;
+};
+
IThreadFactory* SystemThreadFactory();
void SetSystemThreadFactory(IThreadFactory* pool);
diff --git a/util/thread/factory_ut.cpp b/util/thread/factory_ut.cpp
index 647d96c901..f506ce7e06 100644
--- a/util/thread/factory_ut.cpp
+++ b/util/thread/factory_ut.cpp
@@ -1,57 +1,57 @@
#include "factory.h"
#include "pool.h"
-
+
#include <library/cpp/testing/unittest/registar.h>
-
-class TThrPoolTest: public TTestBase {
- UNIT_TEST_SUITE(TThrPoolTest);
- UNIT_TEST(TestSystemPool)
- UNIT_TEST(TestAdaptivePool)
- UNIT_TEST_SUITE_END();
-
+
+class TThrPoolTest: public TTestBase {
+ UNIT_TEST_SUITE(TThrPoolTest);
+ UNIT_TEST(TestSystemPool)
+ UNIT_TEST(TestAdaptivePool)
+ UNIT_TEST_SUITE_END();
+
struct TRunAble: public IThreadFactory::IThreadAble {
- inline TRunAble()
- : done(false)
- {
- }
-
+ inline TRunAble()
+ : done(false)
+ {
+ }
+
~TRunAble() override = default;
-
+
void DoExecute() override {
- done = true;
- }
-
- bool done;
- };
-
-private:
- inline void TestSystemPool() {
- TRunAble r;
-
- {
+ done = true;
+ }
+
+ bool done;
+ };
+
+private:
+ inline void TestSystemPool() {
+ TRunAble r;
+
+ {
THolder<IThreadFactory::IThread> thr = SystemThreadFactory()->Run(&r);
-
- thr->Join();
- }
-
- UNIT_ASSERT_EQUAL(r.done, true);
- }
-
- inline void TestAdaptivePool() {
- TRunAble r;
-
- {
+
+ thr->Join();
+ }
+
+ UNIT_ASSERT_EQUAL(r.done, true);
+ }
+
+ inline void TestAdaptivePool() {
+ TRunAble r;
+
+ {
TAdaptiveThreadPool pool;
-
- pool.Start(0);
-
+
+ pool.Start(0);
+
THolder<IThreadFactory::IThread> thr = pool.Run(&r);
-
- thr->Join();
- }
-
- UNIT_ASSERT_EQUAL(r.done, true);
- }
-};
-
-UNIT_TEST_SUITE_REGISTRATION(TThrPoolTest);
+
+ thr->Join();
+ }
+
+ UNIT_ASSERT_EQUAL(r.done, true);
+ }
+};
+
+UNIT_TEST_SUITE_REGISTRATION(TThrPoolTest);
diff --git a/util/thread/lfqueue.cpp b/util/thread/lfqueue.cpp
index 5861999b78..102e25c2a3 100644
--- a/util/thread/lfqueue.cpp
+++ b/util/thread/lfqueue.cpp
@@ -1 +1 @@
-#include "lfqueue.h"
+#include "lfqueue.h"
diff --git a/util/thread/lfqueue.h b/util/thread/lfqueue.h
index ab523631e4..1d2a776bbb 100644
--- a/util/thread/lfqueue.h
+++ b/util/thread/lfqueue.h
@@ -5,15 +5,15 @@
#include <util/generic/ptr.h>
#include <util/system/atomic.h>
#include <util/system/yassert.h>
-#include "lfstack.h"
+#include "lfstack.h"
-struct TDefaultLFCounter {
- template <class T>
- void IncCount(const T& data) {
+struct TDefaultLFCounter {
+ template <class T>
+ void IncCount(const T& data) {
(void)data;
}
- template <class T>
- void DecCount(const T& data) {
+ template <class T>
+ void DecCount(const T& data) {
(void)data;
}
};
@@ -25,7 +25,7 @@ struct TDefaultLFCounter {
// it is TCounter class responsibility to check validity of passed object
template <class T, class TCounter>
class TLockFreeQueue: public TNonCopyable {
- struct TListNode {
+ struct TListNode {
template <typename U>
TListNode(U&& u, TListNode* next)
: Next(next)
@@ -39,30 +39,30 @@ class TLockFreeQueue: public TNonCopyable {
{
}
- TListNode* volatile Next;
+ TListNode* volatile Next;
T Data;
};
// using inheritance to be able to use 0 bytes for TCounter when we don't need one
- struct TRootNode: public TCounter {
- TListNode* volatile PushQueue;
- TListNode* volatile PopQueue;
- TListNode* volatile ToDelete;
- TRootNode* volatile NextFree;
+ struct TRootNode: public TCounter {
+ TListNode* volatile PushQueue;
+ TListNode* volatile PopQueue;
+ TListNode* volatile ToDelete;
+ TRootNode* volatile NextFree;
- TRootNode()
+ TRootNode()
: PushQueue(nullptr)
, PopQueue(nullptr)
, ToDelete(nullptr)
, NextFree(nullptr)
{
}
- void CopyCounter(TRootNode* x) {
+ void CopyCounter(TRootNode* x) {
*(TCounter*)this = *(TCounter*)x;
}
};
- static void EraseList(TListNode* n) {
+ static void EraseList(TListNode* n) {
while (n) {
TListNode* keepNext = AtomicGet(n->Next);
delete n;
@@ -75,12 +75,12 @@ class TLockFreeQueue: public TNonCopyable {
alignas(64) volatile TAtomic FreeingTaskCounter;
alignas(64) TRootNode* volatile FreePtr;
- void TryToFreeAsyncMemory() {
+ void TryToFreeAsyncMemory() {
TAtomic keepCounter = AtomicAdd(FreeingTaskCounter, 0);
TRootNode* current = AtomicGet(FreePtr);
if (current == nullptr)
return;
- if (AtomicAdd(FreememCounter, 0) == 1) {
+ if (AtomicAdd(FreememCounter, 0) == 1) {
// we are the last thread, try to cleanup
// check if another thread have cleaned up
if (keepCounter != AtomicAdd(FreeingTaskCounter, 0)) {
@@ -98,24 +98,24 @@ class TLockFreeQueue: public TNonCopyable {
}
}
}
- void AsyncRef() {
- AtomicAdd(FreememCounter, 1);
+ void AsyncRef() {
+ AtomicAdd(FreememCounter, 1);
}
- void AsyncUnref() {
+ void AsyncUnref() {
TryToFreeAsyncMemory();
- AtomicAdd(FreememCounter, -1);
+ AtomicAdd(FreememCounter, -1);
}
- void AsyncDel(TRootNode* toDelete, TListNode* lst) {
+ void AsyncDel(TRootNode* toDelete, TListNode* lst) {
AtomicSet(toDelete->ToDelete, lst);
- for (;;) {
+ for (;;) {
AtomicSet(toDelete->NextFree, AtomicGet(FreePtr));
if (AtomicCas(&FreePtr, toDelete, AtomicGet(toDelete->NextFree)))
break;
}
}
- void AsyncUnref(TRootNode* toDelete, TListNode* lst) {
+ void AsyncUnref(TRootNode* toDelete, TListNode* lst) {
TryToFreeAsyncMemory();
- if (AtomicAdd(FreememCounter, -1) == 0) {
+ if (AtomicAdd(FreememCounter, -1) == 0) {
// no other operations in progress, can safely reclaim memory
EraseList(lst);
delete toDelete;
@@ -125,27 +125,27 @@ class TLockFreeQueue: public TNonCopyable {
}
}
- struct TListInvertor {
- TListNode* Copy;
- TListNode* Tail;
- TListNode* PrevFirst;
+ struct TListInvertor {
+ TListNode* Copy;
+ TListNode* Tail;
+ TListNode* PrevFirst;
- TListInvertor()
+ TListInvertor()
: Copy(nullptr)
, Tail(nullptr)
, PrevFirst(nullptr)
{
}
- ~TListInvertor() {
+ ~TListInvertor() {
EraseList(Copy);
}
- void CopyWasUsed() {
+ void CopyWasUsed() {
Copy = nullptr;
Tail = nullptr;
PrevFirst = nullptr;
}
- void DoCopy(TListNode* ptr) {
- TListNode* newFirst = ptr;
+ void DoCopy(TListNode* ptr) {
+ TListNode* newFirst = ptr;
TListNode* newCopy = nullptr;
TListNode* newTail = nullptr;
while (ptr) {
@@ -171,8 +171,8 @@ class TLockFreeQueue: public TNonCopyable {
}
};
- void EnqueueImpl(TListNode* head, TListNode* tail) {
- TRootNode* newRoot = new TRootNode;
+ void EnqueueImpl(TListNode* head, TListNode* tail) {
+ TRootNode* newRoot = new TRootNode;
AsyncRef();
AtomicSet(newRoot->PushQueue, head);
for (;;) {
@@ -225,14 +225,14 @@ class TLockFreeQueue: public TNonCopyable {
}
public:
- TLockFreeQueue()
+ TLockFreeQueue()
: JobQueue(new TRootNode)
, FreememCounter(0)
, FreeingTaskCounter(0)
, FreePtr(nullptr)
{
}
- ~TLockFreeQueue() {
+ ~TLockFreeQueue() {
AsyncRef();
AsyncUnref(); // should free FreeList
EraseList(JobQueue->PushQueue);
@@ -253,17 +253,17 @@ public:
EnqueueImpl(newNode, newNode);
}
template <typename TCollection>
- void EnqueueAll(const TCollection& data) {
+ void EnqueueAll(const TCollection& data) {
EnqueueAll(data.begin(), data.end());
}
template <typename TIter>
- void EnqueueAll(TIter dataBegin, TIter dataEnd) {
+ void EnqueueAll(TIter dataBegin, TIter dataEnd) {
if (dataBegin == dataEnd)
return;
TIter i = dataBegin;
TListNode* volatile node = new TListNode(*i);
- TListNode* volatile tail = node;
+ TListNode* volatile tail = node;
for (++i; i != dataEnd; ++i) {
TListNode* nextNode = node;
@@ -271,7 +271,7 @@ public:
}
EnqueueImpl(node, tail);
}
- bool Dequeue(T* data) {
+ bool Dequeue(T* data) {
TRootNode* newRoot = nullptr;
TListInvertor listInvertor;
AsyncRef();
@@ -288,7 +288,7 @@ public:
newRoot->CopyCounter(curRoot);
newRoot->DecCount(tail->Data);
Y_ASSERT(AtomicGet(curRoot->PopQueue) == tail);
- if (AtomicCas(&JobQueue, newRoot, curRoot)) {
+ if (AtomicCas(&JobQueue, newRoot, curRoot)) {
*data = std::move(tail->Data);
AtomicSet(tail->Next, nullptr);
AsyncUnref(curRoot, tail);
@@ -309,7 +309,7 @@ public:
AtomicSet(newRoot->PopQueue, listInvertor.Copy);
newRoot->CopyCounter(curRoot);
Y_ASSERT(AtomicGet(curRoot->PopQueue) == nullptr);
- if (AtomicCas(&JobQueue, newRoot, curRoot)) {
+ if (AtomicCas(&JobQueue, newRoot, curRoot)) {
newRoot = nullptr;
listInvertor.CopyWasUsed();
AsyncDel(curRoot, AtomicGet(curRoot->PushQueue));
@@ -343,14 +343,14 @@ public:
AsyncUnref(curRoot, toDeleteHead);
}
- bool IsEmpty() {
+ bool IsEmpty() {
AsyncRef();
TRootNode* curRoot = AtomicGet(JobQueue);
bool res = AtomicGet(curRoot->PushQueue) == nullptr && AtomicGet(curRoot->PopQueue) == nullptr;
AsyncUnref();
return res;
}
- TCounter GetCounter() {
+ TCounter GetCounter() {
AsyncRef();
TRootNode* curRoot = AtomicGet(JobQueue);
TCounter res = *(TCounter*)curRoot;
@@ -367,8 +367,8 @@ public:
inline ~TAutoLockFreeQueue() {
TRef tmp;
- while (Dequeue(&tmp)) {
- }
+ while (Dequeue(&tmp)) {
+ }
}
inline bool Dequeue(TRef* t) {
diff --git a/util/thread/lfqueue_ut.cpp b/util/thread/lfqueue_ut.cpp
index 83bca100cf..1290ce0a46 100644
--- a/util/thread/lfqueue_ut.cpp
+++ b/util/thread/lfqueue_ut.cpp
@@ -7,7 +7,7 @@
#include <util/system/atomic.h>
#include <util/thread/pool.h>
-#include "lfqueue.h"
+#include "lfqueue.h"
class TMoveTest {
public:
@@ -54,24 +54,24 @@ private:
class TOperationsChecker {
public:
TOperationsChecker() {
- ++DefaultCtor_;
+ ++DefaultCtor_;
}
TOperationsChecker(TOperationsChecker&&) {
- ++MoveCtor_;
+ ++MoveCtor_;
}
TOperationsChecker(const TOperationsChecker&) {
- ++CopyCtor_;
+ ++CopyCtor_;
}
TOperationsChecker& operator=(TOperationsChecker&&) {
- ++MoveAssign_;
+ ++MoveAssign_;
return *this;
}
TOperationsChecker& operator=(const TOperationsChecker&) {
- ++CopyAssign_;
+ ++CopyAssign_;
return *this;
}
@@ -190,7 +190,7 @@ Y_UNIT_TEST_SUITE(TLockFreeQueueTests) {
UNIT_ASSERT(!queue.Dequeue(&i));
}
- void DequeueAllRunner(TLockFreeQueue<int>& queue, bool singleConsumer) {
+ void DequeueAllRunner(TLockFreeQueue<int>& queue, bool singleConsumer) {
size_t threadsNum = 4;
size_t enqueuesPerThread = 10'000;
TThreadPool p;
@@ -303,11 +303,11 @@ Y_UNIT_TEST_SUITE(TLockFreeQueueTests) {
}
Y_UNIT_TEST(CleanInDestructor) {
- TSimpleSharedPtr<bool> p(new bool);
+ TSimpleSharedPtr<bool> p(new bool);
UNIT_ASSERT_VALUES_EQUAL(1u, p.RefCount());
{
- TLockFreeQueue<TSimpleSharedPtr<bool>> stack;
+ TLockFreeQueue<TSimpleSharedPtr<bool>> stack;
stack.Enqueue(p);
stack.Enqueue(p);
diff --git a/util/thread/lfstack.cpp b/util/thread/lfstack.cpp
index be8b3bdf37..6408639189 100644
--- a/util/thread/lfstack.cpp
+++ b/util/thread/lfstack.cpp
@@ -1 +1 @@
-#include "lfstack.h"
+#include "lfstack.h"
diff --git a/util/thread/lfstack.h b/util/thread/lfstack.h
index ca3d95f3c3..aa64ce1dbe 100644
--- a/util/thread/lfstack.h
+++ b/util/thread/lfstack.h
@@ -5,31 +5,31 @@
//////////////////////////////
// lock free lifo stack
-template <class T>
-class TLockFreeStack: TNonCopyable {
- struct TNode {
+template <class T>
+class TLockFreeStack: TNonCopyable {
+ struct TNode {
T Value;
- TNode* Next;
+ TNode* Next;
TNode() = default;
- template <class U>
+ template <class U>
explicit TNode(U&& val)
: Value(std::forward<U>(val))
, Next(nullptr)
- {
- }
+ {
+ }
};
TNode* Head;
TNode* FreePtr;
TAtomic DequeueCount;
- void TryToFreeMemory() {
+ void TryToFreeMemory() {
TNode* current = AtomicGet(FreePtr);
if (!current)
return;
- if (AtomicAdd(DequeueCount, 0) == 1) {
+ if (AtomicAdd(DequeueCount, 0) == 1) {
// node current is in free list, we are the last thread so try to cleanup
if (AtomicCas(&FreePtr, (TNode*)nullptr, current))
EraseList(current);
@@ -37,7 +37,7 @@ class TLockFreeStack: TNonCopyable {
}
void EraseList(TNode* volatile p) {
while (p) {
- TNode* next = p->Next;
+ TNode* next = p->Next;
delete p;
p = next;
}
@@ -54,20 +54,20 @@ class TLockFreeStack: TNonCopyable {
TNode* volatile node = new TNode(std::forward<U>(u));
EnqueueImpl(node, node);
}
-
+
public:
- TLockFreeStack()
+ TLockFreeStack()
: Head(nullptr)
, FreePtr(nullptr)
- , DequeueCount(0)
- {
- }
- ~TLockFreeStack() {
+ , DequeueCount(0)
+ {
+ }
+ ~TLockFreeStack() {
EraseList(Head);
EraseList(FreePtr);
}
- void Enqueue(const T& t) {
+ void Enqueue(const T& t) {
EnqueueImpl(t);
}
@@ -76,11 +76,11 @@ public:
}
template <typename TCollection>
- void EnqueueAll(const TCollection& data) {
+ void EnqueueAll(const TCollection& data) {
EnqueueAll(data.begin(), data.end());
}
template <typename TIter>
- void EnqueueAll(TIter dataBegin, TIter dataEnd) {
+ void EnqueueAll(TIter dataBegin, TIter dataEnd) {
if (dataBegin == dataEnd) {
return;
}
@@ -95,22 +95,22 @@ public:
}
EnqueueImpl(node, tail);
}
- bool Dequeue(T* res) {
- AtomicAdd(DequeueCount, 1);
+ bool Dequeue(T* res) {
+ AtomicAdd(DequeueCount, 1);
for (TNode* current = AtomicGet(Head); current; current = AtomicGet(Head)) {
if (AtomicCas(&Head, AtomicGet(current->Next), current)) {
*res = std::move(current->Value);
// delete current; // ABA problem
// even more complex node deletion
TryToFreeMemory();
- if (AtomicAdd(DequeueCount, -1) == 0) {
+ if (AtomicAdd(DequeueCount, -1) == 0) {
// no other Dequeue()s, can safely reclaim memory
delete current;
} else {
// Dequeue()s in progress, put node to free list
- for (;;) {
+ for (;;) {
AtomicSet(current->Next, AtomicGet(FreePtr));
- if (AtomicCas(&FreePtr, current, current->Next))
+ if (AtomicCas(&FreePtr, current, current->Next))
break;
}
}
@@ -118,17 +118,17 @@ public:
}
}
TryToFreeMemory();
- AtomicAdd(DequeueCount, -1);
+ AtomicAdd(DequeueCount, -1);
return false;
}
// add all elements to *res
// elements are returned in order of dequeue (top to bottom; see example in unittest)
template <typename TCollection>
- void DequeueAll(TCollection* res) {
+ void DequeueAll(TCollection* res) {
AtomicAdd(DequeueCount, 1);
for (TNode* current = AtomicGet(Head); current; current = AtomicGet(Head)) {
if (AtomicCas(&Head, (TNode*)nullptr, current)) {
- for (TNode* x = current; x;) {
+ for (TNode* x = current; x;) {
res->push_back(std::move(x->Value));
x = x->Next;
}
@@ -140,9 +140,9 @@ public:
EraseList(current);
} else {
// Dequeue()s in progress, add nodes list to free list
- TNode* currentLast = current;
+ TNode* currentLast = current;
while (currentLast->Next) {
- currentLast = currentLast->Next;
+ currentLast = currentLast->Next;
}
for (;;) {
AtomicSet(currentLast->Next, AtomicGet(FreePtr));
@@ -156,7 +156,7 @@ public:
TryToFreeMemory();
AtomicAdd(DequeueCount, -1);
}
- bool DequeueSingleConsumer(T* res) {
+ bool DequeueSingleConsumer(T* res) {
for (TNode* current = AtomicGet(Head); current; current = AtomicGet(Head)) {
if (AtomicCas(&Head, current->Next, current)) {
*res = std::move(current->Value);
@@ -169,10 +169,10 @@ public:
// add all elements to *res
// elements are returned in order of dequeue (top to bottom; see example in unittest)
template <typename TCollection>
- void DequeueAllSingleConsumer(TCollection* res) {
+ void DequeueAllSingleConsumer(TCollection* res) {
for (TNode* current = AtomicGet(Head); current; current = AtomicGet(Head)) {
if (AtomicCas(&Head, (TNode*)nullptr, current)) {
- for (TNode* x = current; x;) {
+ for (TNode* x = current; x;) {
res->push_back(std::move(x->Value));
x = x->Next;
}
@@ -181,7 +181,7 @@ public:
}
}
}
- bool IsEmpty() {
+ bool IsEmpty() {
AtomicAdd(DequeueCount, 0); // mem barrier
return AtomicGet(Head) == nullptr; // without lock, so result is approximate
}
diff --git a/util/thread/lfstack_ut.cpp b/util/thread/lfstack_ut.cpp
index e20a838f95..61a8137db9 100644
--- a/util/thread/lfstack_ut.cpp
+++ b/util/thread/lfstack_ut.cpp
@@ -6,19 +6,19 @@
#include <library/cpp/testing/unittest/registar.h>
-#include "lfstack.h"
+#include "lfstack.h"
Y_UNIT_TEST_SUITE(TLockFreeStackTests) {
class TCountDownLatch {
private:
TAtomic Current_;
TSystemEvent EventObject_;
-
+
public:
TCountDownLatch(unsigned initial)
: Current_(initial)
- {
- }
+ {
+ }
void CountDown() {
if (AtomicDecrement(Current_) == 0) {
@@ -175,11 +175,11 @@ Y_UNIT_TEST_SUITE(TLockFreeStackTests) {
}
Y_UNIT_TEST(CleanInDestructor) {
- TSimpleSharedPtr<bool> p(new bool);
+ TSimpleSharedPtr<bool> p(new bool);
UNIT_ASSERT_VALUES_EQUAL(1u, p.RefCount());
{
- TLockFreeStack<TSimpleSharedPtr<bool>> stack;
+ TLockFreeStack<TSimpleSharedPtr<bool>> stack;
stack.Enqueue(p);
stack.Enqueue(p);
@@ -193,14 +193,14 @@ Y_UNIT_TEST_SUITE(TLockFreeStackTests) {
Y_UNIT_TEST(NoCopyTest) {
static unsigned copied = 0;
struct TCopyCount {
- TCopyCount(int) {
- }
- TCopyCount(const TCopyCount&) {
- ++copied;
- }
-
- TCopyCount(TCopyCount&&) {
- }
+ TCopyCount(int) {
+ }
+ TCopyCount(const TCopyCount&) {
+ ++copied;
+ }
+
+ TCopyCount(TCopyCount&&) {
+ }
TCopyCount& operator=(const TCopyCount&) {
++copied;
diff --git a/util/thread/pool.cpp b/util/thread/pool.cpp
index 05fad02e9b..0275429304 100644
--- a/util/thread/pool.cpp
+++ b/util/thread/pool.cpp
@@ -2,31 +2,31 @@
#include <util/system/defaults.h>
-#if defined(_unix_)
- #include <pthread.h>
-#endif
-
+#if defined(_unix_)
+ #include <pthread.h>
+#endif
+
#include <util/generic/vector.h>
-#include <util/generic/intrlist.h>
+#include <util/generic/intrlist.h>
#include <util/generic/yexception.h>
#include <util/generic/ylimits.h>
-#include <util/generic/singleton.h>
+#include <util/generic/singleton.h>
#include <util/generic/fastqueue.h>
-
+
#include <util/stream/output.h>
#include <util/string/builder.h>
-
-#include <util/system/event.h>
-#include <util/system/mutex.h>
-#include <util/system/atomic.h>
-#include <util/system/condvar.h>
+
+#include <util/system/event.h>
+#include <util/system/mutex.h>
+#include <util/system/atomic.h>
+#include <util/system/condvar.h>
#include <util/system/thread.h>
#include <util/datetime/base.h>
#include "factory.h"
#include "pool.h"
-
+
namespace {
class TThreadNamer {
public:
@@ -36,7 +36,7 @@ namespace {
{
}
- explicit operator bool() const {
+ explicit operator bool() const {
return !ThreadName.empty();
}
@@ -62,269 +62,269 @@ namespace {
TThreadFactoryHolder::TThreadFactoryHolder() noexcept
: Pool_(SystemThreadFactory())
-{
-}
-
+{
+}
+
class TThreadPool::TImpl: public TIntrusiveListItem<TImpl>, public IThreadFactory::IThreadAble {
using TTsr = IThreadPool::TTsr;
using TJobQueue = TFastQueue<IObjectInQueue*>;
using TThreadRef = THolder<IThreadFactory::IThread>;
-
-public:
+
+public:
inline TImpl(TThreadPool* parent, size_t thrnum, size_t maxqueue, const TParams& params)
- : Parent_(parent)
+ : Parent_(parent)
, Blocking(params.Blocking_)
, Catching(params.Catching_)
, Namer(params)
, ShouldTerminate(1)
- , MaxQueueSize(0)
- , ThreadCountExpected(0)
- , ThreadCountReal(0)
- , Forked(false)
- {
- TAtforkQueueRestarter::Get().RegisterObject(this);
- Start(thrnum, maxqueue);
- }
-
+ , MaxQueueSize(0)
+ , ThreadCountExpected(0)
+ , ThreadCountReal(0)
+ , Forked(false)
+ {
+ TAtforkQueueRestarter::Get().RegisterObject(this);
+ Start(thrnum, maxqueue);
+ }
+
inline ~TImpl() override {
- try {
- Stop();
- } catch (...) {
- // ¯\_(ツ)_/¯
- }
-
- TAtforkQueueRestarter::Get().UnregisterObject(this);
+ try {
+ Stop();
+ } catch (...) {
+ // ¯\_(ツ)_/¯
+ }
+
+ TAtforkQueueRestarter::Get().UnregisterObject(this);
Y_ASSERT(Tharr.empty());
- }
-
- inline bool Add(IObjectInQueue* obj) {
+ }
+
+ inline bool Add(IObjectInQueue* obj) {
if (AtomicGet(ShouldTerminate)) {
- return false;
- }
-
- if (Tharr.empty()) {
- TTsr tsr(Parent_);
- obj->Process(tsr);
-
- return true;
- }
-
- with_lock (QueueMutex) {
+ return false;
+ }
+
+ if (Tharr.empty()) {
+ TTsr tsr(Parent_);
+ obj->Process(tsr);
+
+ return true;
+ }
+
+ with_lock (QueueMutex) {
while (MaxQueueSize > 0 && Queue.Size() >= MaxQueueSize && !AtomicGet(ShouldTerminate)) {
- if (!Blocking) {
- return false;
- }
- QueuePopCond.Wait(QueueMutex);
- }
-
+ if (!Blocking) {
+ return false;
+ }
+ QueuePopCond.Wait(QueueMutex);
+ }
+
if (AtomicGet(ShouldTerminate)) {
- return false;
- }
-
- Queue.Push(obj);
+ return false;
+ }
+
+ Queue.Push(obj);
}
-
+
QueuePushCond.Signal();
- return true;
- }
-
+ return true;
+ }
+
inline size_t Size() const noexcept {
- auto guard = Guard(QueueMutex);
+ auto guard = Guard(QueueMutex);
- return Queue.Size();
- }
+ return Queue.Size();
+ }
inline size_t GetMaxQueueSize() const noexcept {
- return MaxQueueSize;
- }
+ return MaxQueueSize;
+ }
inline size_t GetThreadCountExpected() const noexcept {
- return ThreadCountExpected;
- }
+ return ThreadCountExpected;
+ }
inline size_t GetThreadCountReal() const noexcept {
return ThreadCountReal;
}
inline void AtforkAction() noexcept Y_NO_SANITIZE("thread") {
- Forked = true;
- }
-
+ Forked = true;
+ }
+
inline bool NeedRestart() const noexcept {
- return Forked;
- }
-
-private:
- inline void Start(size_t num, size_t maxque) {
+ return Forked;
+ }
+
+private:
+ inline void Start(size_t num, size_t maxque) {
AtomicSet(ShouldTerminate, 0);
- MaxQueueSize = maxque;
- ThreadCountExpected = num;
-
- try {
- for (size_t i = 0; i < num; ++i) {
- Tharr.push_back(Parent_->Pool()->Run(this));
- ++ThreadCountReal;
- }
- } catch (...) {
- Stop();
-
- throw;
- }
- }
-
- inline void Stop() {
+ MaxQueueSize = maxque;
+ ThreadCountExpected = num;
+
+ try {
+ for (size_t i = 0; i < num; ++i) {
+ Tharr.push_back(Parent_->Pool()->Run(this));
+ ++ThreadCountReal;
+ }
+ } catch (...) {
+ Stop();
+
+ throw;
+ }
+ }
+
+ inline void Stop() {
AtomicSet(ShouldTerminate, 1);
- with_lock (QueueMutex) {
- QueuePopCond.BroadCast();
- }
-
- if (!NeedRestart()) {
- WaitForComplete();
- }
-
- Tharr.clear();
- ThreadCountExpected = 0;
- MaxQueueSize = 0;
- }
-
+ with_lock (QueueMutex) {
+ QueuePopCond.BroadCast();
+ }
+
+ if (!NeedRestart()) {
+ WaitForComplete();
+ }
+
+ Tharr.clear();
+ ThreadCountExpected = 0;
+ MaxQueueSize = 0;
+ }
+
inline void WaitForComplete() noexcept {
- with_lock (StopMutex) {
- while (ThreadCountReal) {
- with_lock (QueueMutex) {
+ with_lock (StopMutex) {
+ while (ThreadCountReal) {
+ with_lock (QueueMutex) {
QueuePushCond.Signal();
- }
-
- StopCond.Wait(StopMutex);
- }
- }
- }
-
+ }
+
+ StopCond.Wait(StopMutex);
+ }
+ }
+ }
+
void DoExecute() override {
- THolder<TTsr> tsr(new TTsr(Parent_));
-
+ THolder<TTsr> tsr(new TTsr(Parent_));
+
if (Namer) {
Namer.SetCurrentThreadName();
}
- while (true) {
+ while (true) {
IObjectInQueue* job = nullptr;
-
- with_lock (QueueMutex) {
+
+ with_lock (QueueMutex) {
while (Queue.Empty() && !AtomicGet(ShouldTerminate)) {
QueuePushCond.Wait(QueueMutex);
- }
-
+ }
+
if (AtomicGet(ShouldTerminate) && Queue.Empty()) {
- tsr.Destroy();
-
- break;
- }
-
- job = Queue.Pop();
- }
-
- QueuePopCond.Signal();
-
+ tsr.Destroy();
+
+ break;
+ }
+
+ job = Queue.Pop();
+ }
+
+ QueuePopCond.Signal();
+
if (Catching) {
- try {
+ try {
try {
job->Process(*tsr);
} catch (...) {
Cdbg << "[mtp queue] " << CurrentExceptionMessage() << Endl;
}
- } catch (...) {
- // ¯\_(ツ)_/¯
- }
+ } catch (...) {
+ // ¯\_(ツ)_/¯
+ }
} else {
job->Process(*tsr);
- }
- }
-
- FinishOneThread();
- }
-
+ }
+ }
+
+ FinishOneThread();
+ }
+
inline void FinishOneThread() noexcept {
- auto guard = Guard(StopMutex);
-
- --ThreadCountReal;
- StopCond.Signal();
- }
+ auto guard = Guard(StopMutex);
+
+ --ThreadCountReal;
+ StopCond.Signal();
+ }
-private:
+private:
TThreadPool* Parent_;
const bool Blocking;
const bool Catching;
TThreadNamer Namer;
- mutable TMutex QueueMutex;
- mutable TMutex StopMutex;
+ mutable TMutex QueueMutex;
+ mutable TMutex StopMutex;
TCondVar QueuePushCond;
- TCondVar QueuePopCond;
- TCondVar StopCond;
- TJobQueue Queue;
+ TCondVar QueuePopCond;
+ TCondVar StopCond;
+ TJobQueue Queue;
TVector<TThreadRef> Tharr;
TAtomic ShouldTerminate;
- size_t MaxQueueSize;
- size_t ThreadCountExpected;
- size_t ThreadCountReal;
- bool Forked;
-
- class TAtforkQueueRestarter {
- public:
- static TAtforkQueueRestarter& Get() {
- return *SingletonWithPriority<TAtforkQueueRestarter, 256>();
- }
-
- inline void RegisterObject(TImpl* obj) {
- auto guard = Guard(ActionMutex);
-
- RegisteredObjects.PushBack(obj);
- }
-
- inline void UnregisterObject(TImpl* obj) {
- auto guard = Guard(ActionMutex);
-
- obj->Unlink();
- }
-
- private:
- void ChildAction() {
- with_lock (ActionMutex) {
- for (auto it = RegisteredObjects.Begin(); it != RegisteredObjects.End(); ++it) {
- it->AtforkAction();
- }
- }
- }
-
- static void ProcessChildAction() {
- Get().ChildAction();
- }
-
- TIntrusiveList<TImpl> RegisteredObjects;
- TMutex ActionMutex;
-
- public:
- inline TAtforkQueueRestarter() {
-#if defined(_bionic_)
-//no pthread_atfork on android libc
-#elif defined(_unix_)
+ size_t MaxQueueSize;
+ size_t ThreadCountExpected;
+ size_t ThreadCountReal;
+ bool Forked;
+
+ class TAtforkQueueRestarter {
+ public:
+ static TAtforkQueueRestarter& Get() {
+ return *SingletonWithPriority<TAtforkQueueRestarter, 256>();
+ }
+
+ inline void RegisterObject(TImpl* obj) {
+ auto guard = Guard(ActionMutex);
+
+ RegisteredObjects.PushBack(obj);
+ }
+
+ inline void UnregisterObject(TImpl* obj) {
+ auto guard = Guard(ActionMutex);
+
+ obj->Unlink();
+ }
+
+ private:
+ void ChildAction() {
+ with_lock (ActionMutex) {
+ for (auto it = RegisteredObjects.Begin(); it != RegisteredObjects.End(); ++it) {
+ it->AtforkAction();
+ }
+ }
+ }
+
+ static void ProcessChildAction() {
+ Get().ChildAction();
+ }
+
+ TIntrusiveList<TImpl> RegisteredObjects;
+ TMutex ActionMutex;
+
+ public:
+ inline TAtforkQueueRestarter() {
+#if defined(_bionic_)
+//no pthread_atfork on android libc
+#elif defined(_unix_)
pthread_atfork(nullptr, nullptr, ProcessChildAction);
#endif
- }
- };
-};
-
+ }
+ };
+};
+
TThreadPool::~TThreadPool() = default;
-
+
size_t TThreadPool::Size() const noexcept {
- if (!Impl_.Get()) {
- return 0;
- }
-
- return Impl_->Size();
-}
-
+ if (!Impl_.Get()) {
+ return 0;
+ }
+
+ return Impl_->Size();
+}
+
size_t TThreadPool::GetThreadCountExpected() const noexcept {
if (!Impl_.Get()) {
return 0;
@@ -351,298 +351,298 @@ size_t TThreadPool::GetMaxQueueSize() const noexcept {
bool TThreadPool::Add(IObjectInQueue* obj) {
Y_ENSURE_EX(Impl_.Get(), TThreadPoolException() << TStringBuf("mtp queue not started"));
-
- if (Impl_->NeedRestart()) {
- Start(Impl_->GetThreadCountExpected(), Impl_->GetMaxQueueSize());
+
+ if (Impl_->NeedRestart()) {
+ Start(Impl_->GetThreadCountExpected(), Impl_->GetMaxQueueSize());
}
- return Impl_->Add(obj);
+ return Impl_->Add(obj);
}
void TThreadPool::Start(size_t thrnum, size_t maxque) {
Impl_.Reset(new TImpl(this, thrnum, maxque, Params));
-}
-
+}
+
void TThreadPool::Stop() noexcept {
- Impl_.Destroy();
-}
-
-static TAtomic mtp_queue_counter = 0;
-
+ Impl_.Destroy();
+}
+
+static TAtomic mtp_queue_counter = 0;
+
class TAdaptiveThreadPool::TImpl {
-public:
+public:
class TThread: public IThreadFactory::IThreadAble {
public:
- inline TThread(TImpl* parent)
- : Impl_(parent)
- , Thread_(Impl_->Parent_->Pool()->Run(this))
- {
- }
-
+ inline TThread(TImpl* parent)
+ : Impl_(parent)
+ , Thread_(Impl_->Parent_->Pool()->Run(this))
+ {
+ }
+
inline ~TThread() override {
- Impl_->DecThreadCount();
- }
-
- private:
+ Impl_->DecThreadCount();
+ }
+
+ private:
void DoExecute() noexcept override {
- THolder<TThread> This(this);
-
+ THolder<TThread> This(this);
+
if (Impl_->Namer) {
Impl_->Namer.SetCurrentThreadName();
}
- {
- TTsr tsr(Impl_->Parent_);
- IObjectInQueue* obj;
-
+ {
+ TTsr tsr(Impl_->Parent_);
+ IObjectInQueue* obj;
+
while ((obj = Impl_->WaitForJob()) != nullptr) {
if (Impl_->Catching) {
- try {
+ try {
try {
obj->Process(tsr);
} catch (...) {
Cdbg << Impl_->Name() << " " << CurrentExceptionMessage() << Endl;
}
- } catch (...) {
+ } catch (...) {
// ¯\_(ツ)_/¯
- }
+ }
} else {
obj->Process(tsr);
- }
- }
- }
- }
-
- private:
- TImpl* Impl_;
+ }
+ }
+ }
+ }
+
+ private:
+ TImpl* Impl_;
THolder<IThreadFactory::IThread> Thread_;
- };
-
+ };
+
inline TImpl(TAdaptiveThreadPool* parent, const TParams& params)
- : Parent_(parent)
+ : Parent_(parent)
, Catching(params.Catching_)
, Namer(params)
- , ThrCount_(0)
- , AllDone_(false)
+ , ThrCount_(0)
+ , AllDone_(false)
, Obj_(nullptr)
- , Free_(0)
- , IdleTime_(TDuration::Max())
- {
- sprintf(Name_, "[mtp queue %ld]", (long)AtomicAdd(mtp_queue_counter, 1));
- }
-
+ , Free_(0)
+ , IdleTime_(TDuration::Max())
+ {
+ sprintf(Name_, "[mtp queue %ld]", (long)AtomicAdd(mtp_queue_counter, 1));
+ }
+
inline ~TImpl() {
- Stop();
- }
-
- inline void SetMaxIdleTime(TDuration idleTime) {
- IdleTime_ = idleTime;
- }
-
+ Stop();
+ }
+
+ inline void SetMaxIdleTime(TDuration idleTime) {
+ IdleTime_ = idleTime;
+ }
+
inline const char* Name() const noexcept {
- return Name_;
- }
-
- inline void Add(IObjectInQueue* obj) {
- with_lock (Mutex_) {
+ return Name_;
+ }
+
+ inline void Add(IObjectInQueue* obj) {
+ with_lock (Mutex_) {
while (Obj_ != nullptr) {
- CondFree_.Wait(Mutex_);
- }
-
- if (Free_ == 0) {
- AddThreadNoLock();
- }
-
- Obj_ = obj;
-
+ CondFree_.Wait(Mutex_);
+ }
+
+ if (Free_ == 0) {
+ AddThreadNoLock();
+ }
+
+ Obj_ = obj;
+
Y_ENSURE_EX(!AllDone_, TThreadPoolException() << TStringBuf("adding to a stopped queue"));
- }
-
- CondReady_.Signal();
- }
-
- inline void AddThreads(size_t n) {
- with_lock (Mutex_) {
- while (n) {
- AddThreadNoLock();
-
- --n;
- }
- }
- }
-
+ }
+
+ CondReady_.Signal();
+ }
+
+ inline void AddThreads(size_t n) {
+ with_lock (Mutex_) {
+ while (n) {
+ AddThreadNoLock();
+
+ --n;
+ }
+ }
+ }
+
inline size_t Size() const noexcept {
- return (size_t)ThrCount_;
- }
-
-private:
+ return (size_t)ThrCount_;
+ }
+
+private:
inline void IncThreadCount() noexcept {
- AtomicAdd(ThrCount_, 1);
- }
-
+ AtomicAdd(ThrCount_, 1);
+ }
+
inline void DecThreadCount() noexcept {
- AtomicAdd(ThrCount_, -1);
- }
-
- inline void AddThreadNoLock() {
- IncThreadCount();
-
- try {
- new TThread(this);
- } catch (...) {
- DecThreadCount();
-
- throw;
- }
- }
-
+ AtomicAdd(ThrCount_, -1);
+ }
+
+ inline void AddThreadNoLock() {
+ IncThreadCount();
+
+ try {
+ new TThread(this);
+ } catch (...) {
+ DecThreadCount();
+
+ throw;
+ }
+ }
+
inline void Stop() noexcept {
- Mutex_.Acquire();
-
- AllDone_ = true;
-
+ Mutex_.Acquire();
+
+ AllDone_ = true;
+
while (AtomicGet(ThrCount_)) {
- Mutex_.Release();
- CondReady_.Signal();
- Mutex_.Acquire();
- }
-
- Mutex_.Release();
- }
-
+ Mutex_.Release();
+ CondReady_.Signal();
+ Mutex_.Acquire();
+ }
+
+ Mutex_.Release();
+ }
+
inline IObjectInQueue* WaitForJob() noexcept {
- Mutex_.Acquire();
-
- ++Free_;
-
- while (!Obj_ && !AllDone_) {
+ Mutex_.Acquire();
+
+ ++Free_;
+
+ while (!Obj_ && !AllDone_) {
if (!CondReady_.WaitT(Mutex_, IdleTime_)) {
- break;
- }
- }
-
- IObjectInQueue* ret = Obj_;
+ break;
+ }
+ }
+
+ IObjectInQueue* ret = Obj_;
Obj_ = nullptr;
-
- --Free_;
-
- Mutex_.Release();
- CondFree_.Signal();
-
- return ret;
- }
-
-private:
+
+ --Free_;
+
+ Mutex_.Release();
+ CondFree_.Signal();
+
+ return ret;
+ }
+
+private:
TAdaptiveThreadPool* Parent_;
const bool Catching;
TThreadNamer Namer;
- TAtomic ThrCount_;
- TMutex Mutex_;
- TCondVar CondReady_;
- TCondVar CondFree_;
- bool AllDone_;
- IObjectInQueue* Obj_;
- size_t Free_;
- char Name_[64];
- TDuration IdleTime_;
-};
-
+ TAtomic ThrCount_;
+ TMutex Mutex_;
+ TCondVar CondReady_;
+ TCondVar CondFree_;
+ bool AllDone_;
+ IObjectInQueue* Obj_;
+ size_t Free_;
+ char Name_[64];
+ TDuration IdleTime_;
+};
+
TThreadPoolBase::TThreadPoolBase(const TParams& params)
: TThreadFactoryHolder(params.Factory_)
, Params(params)
-{
-}
-
+{
+}
+
#define DEFINE_THREAD_POOL_CTORS(type) \
- type::type(const TParams& params) \
- : TThreadPoolBase(params) \
- { \
- }
+ type::type(const TParams& params) \
+ : TThreadPoolBase(params) \
+ { \
+ }
DEFINE_THREAD_POOL_CTORS(TThreadPool)
DEFINE_THREAD_POOL_CTORS(TAdaptiveThreadPool)
DEFINE_THREAD_POOL_CTORS(TSimpleThreadPool)
TAdaptiveThreadPool::~TAdaptiveThreadPool() = default;
-
+
bool TAdaptiveThreadPool::Add(IObjectInQueue* obj) {
Y_ENSURE_EX(Impl_.Get(), TThreadPoolException() << TStringBuf("mtp queue not started"));
-
- Impl_->Add(obj);
-
- return true;
-}
-
+
+ Impl_->Add(obj);
+
+ return true;
+}
+
void TAdaptiveThreadPool::Start(size_t, size_t) {
Impl_.Reset(new TImpl(this, Params));
-}
-
+}
+
void TAdaptiveThreadPool::Stop() noexcept {
- Impl_.Destroy();
-}
-
+ Impl_.Destroy();
+}
+
size_t TAdaptiveThreadPool::Size() const noexcept {
- if (Impl_.Get()) {
- return Impl_->Size();
- }
-
- return 0;
-}
-
+ if (Impl_.Get()) {
+ return Impl_->Size();
+ }
+
+ return 0;
+}
+
void TAdaptiveThreadPool::SetMaxIdleTime(TDuration interval) {
Y_ENSURE_EX(Impl_.Get(), TThreadPoolException() << TStringBuf("mtp queue not started"));
-
+
Impl_->SetMaxIdleTime(interval);
-}
-
+}
+
TSimpleThreadPool::~TSimpleThreadPool() {
- try {
- Stop();
- } catch (...) {
- // ¯\_(ツ)_/¯
- }
-}
-
+ try {
+ Stop();
+ } catch (...) {
+ // ¯\_(ツ)_/¯
+ }
+}
+
bool TSimpleThreadPool::Add(IObjectInQueue* obj) {
Y_ENSURE_EX(Slave_.Get(), TThreadPoolException() << TStringBuf("mtp queue not started"));
-
- return Slave_->Add(obj);
-}
-
+
+ return Slave_->Add(obj);
+}
+
void TSimpleThreadPool::Start(size_t thrnum, size_t maxque) {
THolder<IThreadPool> tmp;
TAdaptiveThreadPool* adaptive(nullptr);
-
- if (thrnum) {
+
+ if (thrnum) {
tmp.Reset(new TThreadPoolBinder<TThreadPool, TSimpleThreadPool>(this, Params));
- } else {
+ } else {
adaptive = new TThreadPoolBinder<TAdaptiveThreadPool, TSimpleThreadPool>(this, Params);
- tmp.Reset(adaptive);
- }
-
- tmp->Start(thrnum, maxque);
-
- if (adaptive) {
+ tmp.Reset(adaptive);
+ }
+
+ tmp->Start(thrnum, maxque);
+
+ if (adaptive) {
adaptive->SetMaxIdleTime(TDuration::Seconds(100));
- }
-
- Slave_.Swap(tmp);
-}
-
+ }
+
+ Slave_.Swap(tmp);
+}
+
void TSimpleThreadPool::Stop() noexcept {
- Slave_.Destroy();
-}
-
+ Slave_.Destroy();
+}
+
size_t TSimpleThreadPool::Size() const noexcept {
- if (Slave_.Get()) {
- return Slave_->Size();
- }
-
- return 0;
-}
-
+ if (Slave_.Get()) {
+ return Slave_->Size();
+ }
+
+ return 0;
+}
+
namespace {
- class TOwnedObjectInQueue: public IObjectInQueue {
+ class TOwnedObjectInQueue: public IObjectInQueue {
private:
THolder<IObjectInQueue> Owned;
@@ -661,7 +661,7 @@ namespace {
void IThreadPool::SafeAdd(IObjectInQueue* obj) {
Y_ENSURE_EX(Add(obj), TThreadPoolException() << TStringBuf("can not add object to queue"));
-}
+}
void IThreadPool::SafeAddAndOwn(THolder<IObjectInQueue> obj) {
Y_ENSURE_EX(AddAndOwn(std::move(obj)), TThreadPoolException() << TStringBuf("can not add to queue and own"));
@@ -678,87 +678,87 @@ bool IThreadPool::AddAndOwn(THolder<IObjectInQueue> obj) {
using IThread = IThreadFactory::IThread;
using IThreadAble = IThreadFactory::IThreadAble;
-
-namespace {
- class TPoolThread: public IThread {
- class TThreadImpl: public IObjectInQueue, public TAtomicRefCount<TThreadImpl> {
- public:
- inline TThreadImpl(IThreadAble* func)
- : Func_(func)
- {
- }
-
+
+namespace {
+ class TPoolThread: public IThread {
+ class TThreadImpl: public IObjectInQueue, public TAtomicRefCount<TThreadImpl> {
+ public:
+ inline TThreadImpl(IThreadAble* func)
+ : Func_(func)
+ {
+ }
+
~TThreadImpl() override = default;
-
+
inline void WaitForStart() noexcept {
- StartEvent_.Wait();
- }
-
+ StartEvent_.Wait();
+ }
+
inline void WaitForComplete() noexcept {
- CompleteEvent_.Wait();
- }
-
- private:
+ CompleteEvent_.Wait();
+ }
+
+ private:
void Process(void* /*tsr*/) override {
- TThreadImplRef This(this);
-
- {
- StartEvent_.Signal();
-
- try {
- Func_->Execute();
- } catch (...) {
- // ¯\_(ツ)_/¯
- }
-
- CompleteEvent_.Signal();
- }
- }
-
- private:
- IThreadAble* Func_;
+ TThreadImplRef This(this);
+
+ {
+ StartEvent_.Signal();
+
+ try {
+ Func_->Execute();
+ } catch (...) {
+ // ¯\_(ツ)_/¯
+ }
+
+ CompleteEvent_.Signal();
+ }
+ }
+
+ private:
+ IThreadAble* Func_;
TSystemEvent CompleteEvent_;
TSystemEvent StartEvent_;
- };
-
+ };
+
using TThreadImplRef = TIntrusivePtr<TThreadImpl>;
-
- public:
+
+ public:
inline TPoolThread(IThreadPool* parent)
- : Parent_(parent)
- {
- }
-
+ : Parent_(parent)
+ {
+ }
+
~TPoolThread() override {
- if (Impl_) {
- Impl_->WaitForStart();
- }
- }
-
- private:
+ if (Impl_) {
+ Impl_->WaitForStart();
+ }
+ }
+
+ private:
void DoRun(IThreadAble* func) override {
- TThreadImplRef impl(new TThreadImpl(func));
-
- Parent_->SafeAdd(impl.Get());
- Impl_.Swap(impl);
- }
-
+ TThreadImplRef impl(new TThreadImpl(func));
+
+ Parent_->SafeAdd(impl.Get());
+ Impl_.Swap(impl);
+ }
+
void DoJoin() noexcept override {
- if (Impl_) {
- Impl_->WaitForComplete();
+ if (Impl_) {
+ Impl_->WaitForComplete();
Impl_ = nullptr;
- }
- }
-
- private:
+ }
+ }
+
+ private:
IThreadPool* Parent_;
- TThreadImplRef Impl_;
- };
-}
-
+ TThreadImplRef Impl_;
+ };
+}
+
IThread* IThreadPool::DoCreate() {
- return new TPoolThread(this);
-}
+ return new TPoolThread(this);
+}
THolder<IThreadPool> CreateThreadPool(size_t threadsCount, size_t queueSizeLimit, const TThreadPoolParams& params) {
THolder<IThreadPool> queue;
diff --git a/util/thread/pool.h b/util/thread/pool.h
index d1ea3a67cb..e2a2a03968 100644
--- a/util/thread/pool.h
+++ b/util/thread/pool.h
@@ -2,19 +2,19 @@
#include "fwd.h"
#include "factory.h"
-
-#include <util/system/yassert.h>
+
+#include <util/system/yassert.h>
#include <util/system/defaults.h>
#include <util/generic/yexception.h>
#include <util/generic/ptr.h>
-#include <util/generic/noncopyable.h>
+#include <util/generic/noncopyable.h>
#include <functional>
class TDuration;
-struct IObjectInQueue {
+struct IObjectInQueue {
virtual ~IObjectInQueue() = default;
-
+
/**
* Supposed to be implemented by user, to define jobs processed
* in multiple threads.
@@ -32,38 +32,38 @@ struct IObjectInQueue {
* Useful only for creators of new queue classes.
*/
class TThreadFactoryHolder {
-public:
+public:
TThreadFactoryHolder() noexcept;
-
+
inline TThreadFactoryHolder(IThreadFactory* pool) noexcept
- : Pool_(pool)
- {
- }
-
+ : Pool_(pool)
+ {
+ }
+
inline ~TThreadFactoryHolder() = default;
-
+
inline IThreadFactory* Pool() const noexcept {
- return Pool_;
- }
-
-private:
+ return Pool_;
+ }
+
+private:
IThreadFactory* Pool_;
-};
-
+};
+
class TThreadPoolException: public yexception {
};
-template <class T>
-class TThrFuncObj: public IObjectInQueue {
+template <class T>
+class TThrFuncObj: public IObjectInQueue {
public:
TThrFuncObj(const T& func)
- : Func(func)
- {
+ : Func(func)
+ {
}
TThrFuncObj(T&& func)
- : Func(std::move(func))
- {
+ : Func(std::move(func))
+ {
}
void Process(void*) override {
@@ -75,7 +75,7 @@ private:
T Func;
};
-template <class T>
+template <class T>
IObjectInQueue* MakeThrFuncObj(T&& func) {
return new TThrFuncObj<std::remove_cv_t<std::remove_reference_t<T>>>(std::forward<T>(func));
}
@@ -134,10 +134,10 @@ struct TThreadPoolParams {
};
/**
- * A queue processed simultaneously by several threads
+ * A queue processed simultaneously by several threads
*/
class IThreadPool: public IThreadFactory, public TNonCopyable {
-public:
+public:
using TParams = TThreadPoolParams;
~IThreadPool() override = default;
@@ -148,7 +148,7 @@ public:
*/
void SafeAdd(IObjectInQueue* obj);
- template <class T>
+ template <class T>
void SafeAddFunc(T&& func) {
Y_ENSURE_EX(AddFunc(std::forward<T>(func)), TThreadPoolException() << TStringBuf("can not add function to queue"));
}
@@ -161,9 +161,9 @@ public:
* @return true of obj is successfully added to queue
* @return false if queue is full or shutting down
*/
- virtual bool Add(IObjectInQueue* obj) Y_WARN_UNUSED_RESULT = 0;
+ virtual bool Add(IObjectInQueue* obj) Y_WARN_UNUSED_RESULT = 0;
- template <class T>
+ template <class T>
Y_WARN_UNUSED_RESULT bool AddFunc(T&& func) {
THolder<IObjectInQueue> wrapper(MakeThrFuncObj(std::forward<T>(func)));
bool added = Add(wrapper.Get());
@@ -185,76 +185,76 @@ public:
* RAII wrapper for Create/DestroyThreadSpecificResource.
* Useful only for implementers of new IThreadPool queues.
*/
- class TTsr {
- public:
+ class TTsr {
+ public:
inline TTsr(IThreadPool* q)
- : Q_(q)
- , Data_(Q_->CreateThreadSpecificResource())
- {
- }
-
+ : Q_(q)
+ , Data_(Q_->CreateThreadSpecificResource())
+ {
+ }
+
inline ~TTsr() {
- try {
- Q_->DestroyThreadSpecificResource(Data_);
- } catch (...) {
- // ¯\_(ツ)_/¯
- }
- }
-
+ try {
+ Q_->DestroyThreadSpecificResource(Data_);
+ } catch (...) {
+ // ¯\_(ツ)_/¯
+ }
+ }
+
inline operator void*() noexcept {
- return Data_;
- }
-
- private:
+ return Data_;
+ }
+
+ private:
IThreadPool* Q_;
- void* Data_;
- };
-
+ void* Data_;
+ };
+
/**
* CreateThreadSpecificResource and DestroyThreadSpecificResource
* called from internals of (TAdaptiveThreadPool, TThreadPool, ...) implementation,
* not by user of IThreadPool interface.
* Created resource is passed to IObjectInQueue::Proccess function.
*/
- virtual void* CreateThreadSpecificResource() {
+ virtual void* CreateThreadSpecificResource() {
return nullptr;
- }
-
- virtual void DestroyThreadSpecificResource(void* resource) {
+ }
+
+ virtual void DestroyThreadSpecificResource(void* resource) {
if (resource != nullptr) {
Y_ASSERT(resource == nullptr);
- }
- }
-
-private:
+ }
+ }
+
+private:
IThread* DoCreate() override;
-};
-
+};
+
/**
* Single-threaded implementation of IThreadPool, process tasks in same thread when
* added.
* Can be used to remove multithreading.
*/
class TFakeThreadPool: public IThreadPool {
-public:
+public:
bool Add(IObjectInQueue* pObj) override Y_WARN_UNUSED_RESULT {
- TTsr tsr(this);
- pObj->Process(tsr);
-
- return true;
- }
-
+ TTsr tsr(this);
+ pObj->Process(tsr);
+
+ return true;
+ }
+
void Start(size_t, size_t = 0) override {
- }
-
+ }
+
void Stop() noexcept override {
- }
-
+ }
+
size_t Size() const noexcept override {
- return 0;
- }
-};
-
+ return 0;
+ }
+};
+
class TThreadPoolBase: public IThreadPool, public TThreadFactoryHolder {
public:
TThreadPoolBase(const TParams& params);
@@ -265,10 +265,10 @@ protected:
/** queue processed by fixed size thread pool */
class TThreadPool: public TThreadPoolBase {
-public:
+public:
TThreadPool(const TParams& params = {});
~TThreadPool() override;
-
+
bool Add(IObjectInQueue* obj) override Y_WARN_UNUSED_RESULT;
/**
* @param queueSizeLimit means "unlimited" when = 0
@@ -280,45 +280,45 @@ public:
size_t GetThreadCountExpected() const noexcept;
size_t GetThreadCountReal() const noexcept;
size_t GetMaxQueueSize() const noexcept;
-
-private:
- class TImpl;
- THolder<TImpl> Impl_;
-};
-
+
+private:
+ class TImpl;
+ THolder<TImpl> Impl_;
+};
+
/**
* Always create new thread for new task, when all existing threads are busy.
* Maybe dangerous, number of threads is not limited.
*/
class TAdaptiveThreadPool: public TThreadPoolBase {
-public:
+public:
TAdaptiveThreadPool(const TParams& params = {});
~TAdaptiveThreadPool() override;
-
+
/**
* If working thread waits task too long (more then interval parameter),
* then the thread would be killed. Default value - infinity, all created threads
* waits for new task forever, before Stop.
*/
- void SetMaxIdleTime(TDuration interval);
-
+ void SetMaxIdleTime(TDuration interval);
+
bool Add(IObjectInQueue* obj) override Y_WARN_UNUSED_RESULT;
/** @param thrnum, @param maxque are ignored */
void Start(size_t thrnum = 0, size_t maxque = 0) override;
void Stop() noexcept override;
size_t Size() const noexcept override;
-
+
private:
- class TImpl;
- THolder<TImpl> Impl_;
+ class TImpl;
+ THolder<TImpl> Impl_;
};
/** Behave like TThreadPool or TAdaptiveThreadPool, choosen by thrnum parameter of Start() */
class TSimpleThreadPool: public TThreadPoolBase {
-public:
+public:
TSimpleThreadPool(const TParams& params = {});
~TSimpleThreadPool() override;
-
+
bool Add(IObjectInQueue* obj) override Y_WARN_UNUSED_RESULT;
/**
* @parameter thrnum. If thrnum is 0, use TAdaptiveThreadPool with small
@@ -327,11 +327,11 @@ public:
void Start(size_t thrnum, size_t maxque = 0) override;
void Stop() noexcept override;
size_t Size() const noexcept override;
-
-private:
+
+private:
THolder<IThreadPool> Slave_;
-};
-
+};
+
/**
* Helper to override virtual functions Create/DestroyThreadSpecificResource
* from IThreadPool and implement them using functions with same name from
@@ -339,46 +339,46 @@ private:
*/
template <class TQueueType, class TSlave>
class TThreadPoolBinder: public TQueueType {
-public:
+public:
inline TThreadPoolBinder(TSlave* slave)
- : Slave_(slave)
- {
- }
-
- template <class... Args>
- inline TThreadPoolBinder(TSlave* slave, Args&&... args)
+ : Slave_(slave)
+ {
+ }
+
+ template <class... Args>
+ inline TThreadPoolBinder(TSlave* slave, Args&&... args)
: TQueueType(std::forward<Args>(args)...)
- , Slave_(slave)
- {
- }
-
+ , Slave_(slave)
+ {
+ }
+
inline TThreadPoolBinder(TSlave& slave)
- : Slave_(&slave)
- {
- }
-
+ : Slave_(&slave)
+ {
+ }
+
~TThreadPoolBinder() override {
- try {
- this->Stop();
- } catch (...) {
- // ¯\_(ツ)_/¯
- }
- }
-
+ try {
+ this->Stop();
+ } catch (...) {
+ // ¯\_(ツ)_/¯
+ }
+ }
+
void* CreateThreadSpecificResource() override {
- return Slave_->CreateThreadSpecificResource();
- }
-
+ return Slave_->CreateThreadSpecificResource();
+ }
+
void DestroyThreadSpecificResource(void* resource) override {
- Slave_->DestroyThreadSpecificResource(resource);
- }
-
-private:
- TSlave* Slave_;
-};
-
+ Slave_->DestroyThreadSpecificResource(resource);
+ }
+
+private:
+ TSlave* Slave_;
+};
+
inline void Delete(THolder<IThreadPool> q) {
- if (q.Get()) {
+ if (q.Get()) {
q->Stop();
}
}
diff --git a/util/thread/pool_ut.cpp b/util/thread/pool_ut.cpp
index 893770d0c4..200bd89060 100644
--- a/util/thread/pool_ut.cpp
+++ b/util/thread/pool_ut.cpp
@@ -1,10 +1,10 @@
#include "pool.h"
-
+
#include <library/cpp/testing/unittest/registar.h>
#include <util/stream/output.h>
-#include <util/random/fast.h>
-#include <util/system/spinlock.h>
+#include <util/random/fast.h>
+#include <util/system/spinlock.h>
#include <util/system/thread.h>
#include <util/system/mutex.h>
#include <util/system/condvar.h>
@@ -12,26 +12,26 @@
struct TThreadPoolTest {
TSpinLock Lock;
long R = -1;
-
- struct TTask: public IObjectInQueue {
+
+ struct TTask: public IObjectInQueue {
TThreadPoolTest* Test = nullptr;
long Value = 0;
-
+
TTask(TThreadPoolTest* test, int value)
- : Test(test)
- , Value(value)
- {
- }
-
+ : Test(test)
+ , Value(value)
+ {
+ }
+
void Process(void*) override {
THolder<TTask> This(this);
-
+
TGuard<TSpinLock> guard(Test->Lock);
Test->R ^= Value;
- }
- };
-
- struct TOwnedTask: public IObjectInQueue {
+ }
+ };
+
+ struct TOwnedTask: public IObjectInQueue {
bool& Processed;
bool& Destructed;
@@ -51,40 +51,40 @@ struct TThreadPoolTest {
};
inline void TestAnyQueue(IThreadPool* queue, size_t queueSize = 1000) {
- TReallyFastRng32 rand(17);
- const size_t cnt = 1000;
-
+ TReallyFastRng32 rand(17);
+ const size_t cnt = 1000;
+
R = 0;
-
- for (size_t i = 0; i < cnt; ++i) {
- R ^= (long)rand.GenRand();
+
+ for (size_t i = 0; i < cnt; ++i) {
+ R ^= (long)rand.GenRand();
+ }
+
+ queue->Start(10, queueSize);
+ rand = TReallyFastRng32(17);
+
+ for (size_t i = 0; i < cnt; ++i) {
+ UNIT_ASSERT(queue->Add(new TTask(this, (long)rand.GenRand())));
}
-
- queue->Start(10, queueSize);
- rand = TReallyFastRng32(17);
-
- for (size_t i = 0; i < cnt; ++i) {
- UNIT_ASSERT(queue->Add(new TTask(this, (long)rand.GenRand())));
- }
-
- queue->Stop();
-
+
+ queue->Stop();
+
UNIT_ASSERT_EQUAL(0, R);
- }
+ }
};
class TFailAddQueue: public IThreadPool {
public:
bool Add(IObjectInQueue* /*obj*/) override Y_WARN_UNUSED_RESULT {
return false;
- }
+ }
void Start(size_t, size_t) override {
}
-
+
void Stop() noexcept override {
- }
-
+ }
+
size_t Size() const noexcept override {
return 0;
}
@@ -111,7 +111,7 @@ Y_UNIT_TEST_SUITE(TThreadPoolTest) {
TAdaptiveThreadPool q;
t.TestAnyQueue(&q);
}
- }
+ }
Y_UNIT_TEST(TestAddAndOwn) {
TThreadPool q;
@@ -128,8 +128,8 @@ Y_UNIT_TEST_SUITE(TThreadPoolTest) {
Y_UNIT_TEST(TestAddFunc) {
TFailAddQueue queue;
bool added = queue.AddFunc(
- []() {} // Lambda, I call him 'Lambda'!
- );
+ []() {} // Lambda, I call him 'Lambda'!
+ );
UNIT_ASSERT_VALUES_EQUAL(added, false);
}
@@ -154,7 +154,7 @@ Y_UNIT_TEST_SUITE(TThreadPoolTest) {
TThreadPool queue(TThreadPool::TParams().SetBlocking(false).SetCatching(true));
queue.Start(2);
- queue.SafeAddFunc([data = TFailOnCopy()]() {});
+ queue.SafeAddFunc([data = TFailOnCopy()]() {});
queue.Stop();
}
@@ -179,7 +179,7 @@ Y_UNIT_TEST_SUITE(TThreadPoolTest) {
queue.Stop();
}
- void TestFixedThreadName(IThreadPool& pool, const TString& expectedName) {
+ void TestFixedThreadName(IThreadPool& pool, const TString& expectedName) {
pool.Start(1);
TString name;
pool.SafeAddFunc([&name]() {
@@ -204,7 +204,7 @@ Y_UNIT_TEST_SUITE(TThreadPoolTest) {
}
}
- void TestEnumeratedThreadName(IThreadPool& pool, const THashSet<TString>& expectedNames) {
+ void TestEnumeratedThreadName(IThreadPool& pool, const THashSet<TString>& expectedNames) {
pool.Start(expectedNames.size());
TMutex lock;
TCondVar allReady;
@@ -212,7 +212,7 @@ Y_UNIT_TEST_SUITE(TThreadPoolTest) {
THashSet<TString> names;
for (size_t i = 0; i < expectedNames.size(); ++i) {
pool.SafeAddFunc([&]() {
- with_lock (lock) {
+ with_lock (lock) {
if (++readyCount == expectedNames.size()) {
allReady.BroadCast();
} else {
diff --git a/util/thread/singleton.cpp b/util/thread/singleton.cpp
index a898bdc9d4..83bee4e88b 100644
--- a/util/thread/singleton.cpp
+++ b/util/thread/singleton.cpp
@@ -1 +1 @@
-#include "singleton.h"
+#include "singleton.h"
diff --git a/util/thread/singleton.h b/util/thread/singleton.h
index 4a1e05aea0..a9af366005 100644
--- a/util/thread/singleton.h
+++ b/util/thread/singleton.h
@@ -4,38 +4,38 @@
#include <util/generic/singleton.h>
#include <util/generic/ptr.h>
-namespace NPrivate {
- template <class T, size_t Priority>
- struct TFastThreadSingletonHelper {
- static inline T* GetSlow() {
- return SingletonWithPriority<NTls::TValue<T>, Priority>()->GetPtr();
- }
+namespace NPrivate {
+ template <class T, size_t Priority>
+ struct TFastThreadSingletonHelper {
+ static inline T* GetSlow() {
+ return SingletonWithPriority<NTls::TValue<T>, Priority>()->GetPtr();
+ }
- static inline T* Get() {
+ static inline T* Get() {
#if defined(Y_HAVE_FAST_POD_TLS)
- Y_POD_STATIC_THREAD(T*) fast(nullptr);
+ Y_POD_STATIC_THREAD(T*) fast(nullptr);
if (Y_UNLIKELY(!fast)) {
- fast = GetSlow();
- }
-
- return fast;
-#else
- return GetSlow();
-#endif
- }
- };
+ fast = GetSlow();
+ }
+
+ return fast;
+#else
+ return GetSlow();
+#endif
+ }
+ };
}
-template <class T, size_t Priority>
-static inline T* FastTlsSingletonWithPriority() {
- return ::NPrivate::TFastThreadSingletonHelper<T, Priority>::Get();
+template <class T, size_t Priority>
+static inline T* FastTlsSingletonWithPriority() {
+ return ::NPrivate::TFastThreadSingletonHelper<T, Priority>::Get();
}
// NB: the singleton is the same for all modules that use
// FastTlsSingleton with the same type parameter. If unique singleton
// required, use unique types.
-template <class T>
-static inline T* FastTlsSingleton() {
- return FastTlsSingletonWithPriority<T, TSingletonTraits<T>::Priority>();
-}
+template <class T>
+static inline T* FastTlsSingleton() {
+ return FastTlsSingletonWithPriority<T, TSingletonTraits<T>::Priority>();
+}
diff --git a/util/thread/singleton_ut.cpp b/util/thread/singleton_ut.cpp
index 164b1cc184..0ad4d395a6 100644
--- a/util/thread/singleton_ut.cpp
+++ b/util/thread/singleton_ut.cpp
@@ -7,8 +7,8 @@ namespace {
int i;
TFoo()
: i(0)
- {
- }
+ {
+ }
};
}
diff --git a/util/thread/ut/ya.make b/util/thread/ut/ya.make
index 93198bfaf1..102c21429f 100644
--- a/util/thread/ut/ya.make
+++ b/util/thread/ut/ya.make
@@ -1,14 +1,14 @@
-UNITTEST_FOR(util)
+UNITTEST_FOR(util)
OWNER(g:util)
SUBSCRIBER(g:util-subscribers)
SRCS(
thread/factory_ut.cpp
- thread/lfqueue_ut.cpp
- thread/lfstack_ut.cpp
+ thread/lfqueue_ut.cpp
+ thread/lfstack_ut.cpp
thread/pool_ut.cpp
- thread/singleton_ut.cpp
+ thread/singleton_ut.cpp
)
PEERDIR(