aboutsummaryrefslogtreecommitdiffstats
path: root/util
diff options
context:
space:
mode:
authorivanzhukov <ivanzhukov@yandex-team.ru>2022-02-10 16:49:40 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:49:40 +0300
commit0892d79ab411592ad25175c4bdadbcb09b466cf5 (patch)
tree98dfdd45463c9bd747101748a9ca25d2917390fd /util
parent1b7466cb957659079ebebbb5d76e64e51f3306f0 (diff)
downloadydb-0892d79ab411592ad25175c4bdadbcb09b466cf5.tar.gz
Restoring authorship annotation for <ivanzhukov@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'util')
-rw-r--r--util/generic/typetraits.h18
-rw-r--r--util/thread/lfqueue.h124
-rw-r--r--util/thread/lfqueue_ut.cpp226
-rw-r--r--util/thread/pool.cpp50
-rw-r--r--util/thread/pool.h6
-rw-r--r--util/thread/pool_ut.cpp38
6 files changed, 231 insertions, 231 deletions
diff --git a/util/generic/typetraits.h b/util/generic/typetraits.h
index d165bd1a06..493813c943 100644
--- a/util/generic/typetraits.h
+++ b/util/generic/typetraits.h
@@ -268,18 +268,18 @@ struct TIsSpecializationOf: std::false_type {};
template <template <class...> class T, class... Ts>
struct TIsSpecializationOf<T, T<Ts...>>: std::true_type {};
-
-/*
+
+/*
* TDependentFalse is a constant dependent on a template parameter.
* Use it in static_assert in a false branch of if constexpr to produce a compile error.
- * See an example with dependent_false at https://en.cppreference.com/w/cpp/language/if
- *
- * if constexpr (std::is_same<T, someType1>) {
- * } else if constexpr (std::is_same<T, someType2>) {
- * } else {
+ * See an example with dependent_false at https://en.cppreference.com/w/cpp/language/if
+ *
+ * if constexpr (std::is_same<T, someType1>) {
+ * } else if constexpr (std::is_same<T, someType2>) {
+ * } else {
* static_assert(TDependentFalse<T>, "unknown type");
- * }
- */
+ * }
+ */
template <typename... T>
constexpr bool TDependentFalse = false;
diff --git a/util/thread/lfqueue.h b/util/thread/lfqueue.h
index ab523631e4..c62596d586 100644
--- a/util/thread/lfqueue.h
+++ b/util/thread/lfqueue.h
@@ -64,7 +64,7 @@ class TLockFreeQueue: public TNonCopyable {
static void EraseList(TListNode* n) {
while (n) {
- TListNode* keepNext = AtomicGet(n->Next);
+ TListNode* keepNext = AtomicGet(n->Next);
delete n;
n = keepNext;
}
@@ -151,7 +151,7 @@ class TLockFreeQueue: public TNonCopyable {
while (ptr) {
if (ptr == PrevFirst) {
// short cut, we have copied this part already
- AtomicSet(Tail->Next, newCopy);
+ AtomicSet(Tail->Next, newCopy);
newCopy = Copy;
Copy = nullptr; // do not destroy prev try
if (!newTail)
@@ -160,7 +160,7 @@ class TLockFreeQueue: public TNonCopyable {
}
TListNode* newElem = new TListNode(ptr->Data, newCopy);
newCopy = newElem;
- ptr = AtomicGet(ptr->Next);
+ ptr = AtomicGet(ptr->Next);
if (!newTail)
newTail = newElem;
}
@@ -177,11 +177,11 @@ class TLockFreeQueue: public TNonCopyable {
AtomicSet(newRoot->PushQueue, head);
for (;;) {
TRootNode* curRoot = AtomicGet(JobQueue);
- AtomicSet(tail->Next, AtomicGet(curRoot->PushQueue));
+ AtomicSet(tail->Next, AtomicGet(curRoot->PushQueue));
AtomicSet(newRoot->PopQueue, AtomicGet(curRoot->PopQueue));
newRoot->CopyCounter(curRoot);
- for (TListNode* node = head;; node = AtomicGet(node->Next)) {
+ for (TListNode* node = head;; node = AtomicGet(node->Next)) {
newRoot->IncCount(node->Data);
if (node == tail)
break;
@@ -194,36 +194,36 @@ class TLockFreeQueue: public TNonCopyable {
}
}
- template <typename TCollection>
- static void FillCollection(TListNode* lst, TCollection* res) {
- while (lst) {
- res->emplace_back(std::move(lst->Data));
- lst = AtomicGet(lst->Next);
- }
- }
-
- /** Traverses a given list simultaneously creating its inversed version.
- * After that, fills a collection with a reversed version and returns the last visited lst's node.
- */
- template <typename TCollection>
- static TListNode* FillCollectionReverse(TListNode* lst, TCollection* res) {
- if (!lst) {
- return nullptr;
- }
-
- TListNode* newCopy = nullptr;
- do {
- TListNode* newElem = new TListNode(std::move(lst->Data), newCopy);
- newCopy = newElem;
- lst = AtomicGet(lst->Next);
- } while (lst);
-
- FillCollection(newCopy, res);
- EraseList(newCopy);
-
- return lst;
- }
-
+ template <typename TCollection>
+ static void FillCollection(TListNode* lst, TCollection* res) {
+ while (lst) {
+ res->emplace_back(std::move(lst->Data));
+ lst = AtomicGet(lst->Next);
+ }
+ }
+
+ /** Traverses a given list simultaneously creating its inversed version.
+ * After that, fills a collection with a reversed version and returns the last visited lst's node.
+ */
+ template <typename TCollection>
+ static TListNode* FillCollectionReverse(TListNode* lst, TCollection* res) {
+ if (!lst) {
+ return nullptr;
+ }
+
+ TListNode* newCopy = nullptr;
+ do {
+ TListNode* newElem = new TListNode(std::move(lst->Data), newCopy);
+ newCopy = newElem;
+ lst = AtomicGet(lst->Next);
+ } while (lst);
+
+ FillCollection(newCopy, res);
+ EraseList(newCopy);
+
+ return lst;
+ }
+
public:
TLockFreeQueue()
: JobQueue(new TRootNode)
@@ -284,13 +284,13 @@ public:
newRoot = new TRootNode;
AtomicSet(newRoot->PushQueue, AtomicGet(curRoot->PushQueue));
- AtomicSet(newRoot->PopQueue, AtomicGet(tail->Next));
+ AtomicSet(newRoot->PopQueue, AtomicGet(tail->Next));
newRoot->CopyCounter(curRoot);
newRoot->DecCount(tail->Data);
Y_ASSERT(AtomicGet(curRoot->PopQueue) == tail);
if (AtomicCas(&JobQueue, newRoot, curRoot)) {
*data = std::move(tail->Data);
- AtomicSet(tail->Next, nullptr);
+ AtomicSet(tail->Next, nullptr);
AsyncUnref(curRoot, tail);
return true;
}
@@ -318,31 +318,31 @@ public:
}
}
}
- template <typename TCollection>
- void DequeueAll(TCollection* res) {
- AsyncRef();
-
- TRootNode* newRoot = new TRootNode;
- TRootNode* curRoot;
- do {
- curRoot = AtomicGet(JobQueue);
- } while (!AtomicCas(&JobQueue, newRoot, curRoot));
-
- FillCollection(curRoot->PopQueue, res);
-
- TListNode* toDeleteHead = curRoot->PushQueue;
- TListNode* toDeleteTail = FillCollectionReverse(curRoot->PushQueue, res);
- AtomicSet(curRoot->PushQueue, nullptr);
-
- if (toDeleteTail) {
- toDeleteTail->Next = curRoot->PopQueue;
- } else {
- toDeleteTail = curRoot->PopQueue;
- }
- AtomicSet(curRoot->PopQueue, nullptr);
-
- AsyncUnref(curRoot, toDeleteHead);
- }
+ template <typename TCollection>
+ void DequeueAll(TCollection* res) {
+ AsyncRef();
+
+ TRootNode* newRoot = new TRootNode;
+ TRootNode* curRoot;
+ do {
+ curRoot = AtomicGet(JobQueue);
+ } while (!AtomicCas(&JobQueue, newRoot, curRoot));
+
+ FillCollection(curRoot->PopQueue, res);
+
+ TListNode* toDeleteHead = curRoot->PushQueue;
+ TListNode* toDeleteTail = FillCollectionReverse(curRoot->PushQueue, res);
+ AtomicSet(curRoot->PushQueue, nullptr);
+
+ if (toDeleteTail) {
+ toDeleteTail->Next = curRoot->PopQueue;
+ } else {
+ toDeleteTail = curRoot->PopQueue;
+ }
+ AtomicSet(curRoot->PopQueue, nullptr);
+
+ AsyncUnref(curRoot, toDeleteHead);
+ }
bool IsEmpty() {
AsyncRef();
TRootNode* curRoot = AtomicGet(JobQueue);
diff --git a/util/thread/lfqueue_ut.cpp b/util/thread/lfqueue_ut.cpp
index 83bca100cf..96905a4c84 100644
--- a/util/thread/lfqueue_ut.cpp
+++ b/util/thread/lfqueue_ut.cpp
@@ -1,11 +1,11 @@
#include <library/cpp/threading/future/future.h>
#include <library/cpp/testing/unittest/registar.h>
-#include <util/generic/algorithm.h>
+#include <util/generic/algorithm.h>
#include <util/generic/vector.h>
#include <util/generic/ptr.h>
-#include <util/system/atomic.h>
-#include <util/thread/pool.h>
+#include <util/system/atomic.h>
+#include <util/thread/pool.h>
#include "lfqueue.h"
@@ -191,117 +191,117 @@ Y_UNIT_TEST_SUITE(TLockFreeQueueTests) {
}
void DequeueAllRunner(TLockFreeQueue<int>& queue, bool singleConsumer) {
- size_t threadsNum = 4;
- size_t enqueuesPerThread = 10'000;
- TThreadPool p;
- p.Start(threadsNum, 0);
-
- TVector<NThreading::TFuture<void>> futures;
-
- for (size_t i = 0; i < threadsNum; ++i) {
- NThreading::TPromise<void> promise = NThreading::NewPromise();
- futures.emplace_back(promise.GetFuture());
-
- p.SafeAddFunc([enqueuesPerThread, &queue, promise]() mutable {
- for (size_t i = 0; i != enqueuesPerThread; ++i) {
- queue.Enqueue(i);
- }
-
- promise.SetValue();
- });
- }
-
- TAtomic elementsLeft;
- AtomicSet(elementsLeft, threadsNum * enqueuesPerThread);
-
- ui64 numOfConsumers = singleConsumer ? 1 : threadsNum;
-
- TVector<TVector<int>> dataBuckets(numOfConsumers);
-
- for (size_t i = 0; i < numOfConsumers; ++i) {
- NThreading::TPromise<void> promise = NThreading::NewPromise();
- futures.emplace_back(promise.GetFuture());
-
- p.SafeAddFunc([&queue, &elementsLeft, promise, consumerData{&dataBuckets[i]}]() mutable {
- TVector<int> vec;
- while (static_cast<i64>(AtomicGet(elementsLeft)) > 0) {
- for (size_t i = 0; i != 100; ++i) {
- vec.clear();
- queue.DequeueAll(&vec);
-
- AtomicSub(elementsLeft, vec.size());
- consumerData->insert(consumerData->end(), vec.begin(), vec.end());
- }
- }
-
- promise.SetValue();
- });
- }
-
+ size_t threadsNum = 4;
+ size_t enqueuesPerThread = 10'000;
+ TThreadPool p;
+ p.Start(threadsNum, 0);
+
+ TVector<NThreading::TFuture<void>> futures;
+
+ for (size_t i = 0; i < threadsNum; ++i) {
+ NThreading::TPromise<void> promise = NThreading::NewPromise();
+ futures.emplace_back(promise.GetFuture());
+
+ p.SafeAddFunc([enqueuesPerThread, &queue, promise]() mutable {
+ for (size_t i = 0; i != enqueuesPerThread; ++i) {
+ queue.Enqueue(i);
+ }
+
+ promise.SetValue();
+ });
+ }
+
+ TAtomic elementsLeft;
+ AtomicSet(elementsLeft, threadsNum * enqueuesPerThread);
+
+ ui64 numOfConsumers = singleConsumer ? 1 : threadsNum;
+
+ TVector<TVector<int>> dataBuckets(numOfConsumers);
+
+ for (size_t i = 0; i < numOfConsumers; ++i) {
+ NThreading::TPromise<void> promise = NThreading::NewPromise();
+ futures.emplace_back(promise.GetFuture());
+
+ p.SafeAddFunc([&queue, &elementsLeft, promise, consumerData{&dataBuckets[i]}]() mutable {
+ TVector<int> vec;
+ while (static_cast<i64>(AtomicGet(elementsLeft)) > 0) {
+ for (size_t i = 0; i != 100; ++i) {
+ vec.clear();
+ queue.DequeueAll(&vec);
+
+ AtomicSub(elementsLeft, vec.size());
+ consumerData->insert(consumerData->end(), vec.begin(), vec.end());
+ }
+ }
+
+ promise.SetValue();
+ });
+ }
+
NThreading::WaitExceptionOrAll(futures).GetValueSync();
- p.Stop();
-
- TVector<int> left;
- queue.DequeueAll(&left);
-
- UNIT_ASSERT(left.empty());
-
- TVector<int> data;
- for (auto& dataBucket : dataBuckets) {
- data.insert(data.end(), dataBucket.begin(), dataBucket.end());
- }
-
- UNIT_ASSERT_EQUAL(data.size(), threadsNum * enqueuesPerThread);
-
- size_t threadIdx = 0;
- size_t cntValue = 0;
-
- Sort(data.begin(), data.end());
- for (size_t i = 0; i != data.size(); ++i) {
- UNIT_ASSERT_VALUES_EQUAL(cntValue, data[i]);
- ++threadIdx;
-
- if (threadIdx == threadsNum) {
- ++cntValue;
- threadIdx = 0;
- }
- }
- }
-
- Y_UNIT_TEST(TestDequeueAllSingleConsumer) {
- TLockFreeQueue<int> queue;
- DequeueAllRunner(queue, true);
- }
-
- Y_UNIT_TEST(TestDequeueAllMultipleConsumers) {
- TLockFreeQueue<int> queue;
- DequeueAllRunner(queue, false);
- }
-
- Y_UNIT_TEST(TestDequeueAllEmptyQueue) {
- TLockFreeQueue<int> queue;
- TVector<int> vec;
-
- queue.DequeueAll(&vec);
-
- UNIT_ASSERT(vec.empty());
- }
-
- Y_UNIT_TEST(TestDequeueAllQueueOrder) {
- TLockFreeQueue<int> queue;
- queue.Enqueue(1);
- queue.Enqueue(2);
- queue.Enqueue(3);
-
- TVector<int> v;
- queue.DequeueAll(&v);
-
- UNIT_ASSERT_VALUES_EQUAL(v.size(), 3);
- UNIT_ASSERT_VALUES_EQUAL(v[0], 1);
- UNIT_ASSERT_VALUES_EQUAL(v[1], 2);
- UNIT_ASSERT_VALUES_EQUAL(v[2], 3);
- }
-
+ p.Stop();
+
+ TVector<int> left;
+ queue.DequeueAll(&left);
+
+ UNIT_ASSERT(left.empty());
+
+ TVector<int> data;
+ for (auto& dataBucket : dataBuckets) {
+ data.insert(data.end(), dataBucket.begin(), dataBucket.end());
+ }
+
+ UNIT_ASSERT_EQUAL(data.size(), threadsNum * enqueuesPerThread);
+
+ size_t threadIdx = 0;
+ size_t cntValue = 0;
+
+ Sort(data.begin(), data.end());
+ for (size_t i = 0; i != data.size(); ++i) {
+ UNIT_ASSERT_VALUES_EQUAL(cntValue, data[i]);
+ ++threadIdx;
+
+ if (threadIdx == threadsNum) {
+ ++cntValue;
+ threadIdx = 0;
+ }
+ }
+ }
+
+ Y_UNIT_TEST(TestDequeueAllSingleConsumer) {
+ TLockFreeQueue<int> queue;
+ DequeueAllRunner(queue, true);
+ }
+
+ Y_UNIT_TEST(TestDequeueAllMultipleConsumers) {
+ TLockFreeQueue<int> queue;
+ DequeueAllRunner(queue, false);
+ }
+
+ Y_UNIT_TEST(TestDequeueAllEmptyQueue) {
+ TLockFreeQueue<int> queue;
+ TVector<int> vec;
+
+ queue.DequeueAll(&vec);
+
+ UNIT_ASSERT(vec.empty());
+ }
+
+ Y_UNIT_TEST(TestDequeueAllQueueOrder) {
+ TLockFreeQueue<int> queue;
+ queue.Enqueue(1);
+ queue.Enqueue(2);
+ queue.Enqueue(3);
+
+ TVector<int> v;
+ queue.DequeueAll(&v);
+
+ UNIT_ASSERT_VALUES_EQUAL(v.size(), 3);
+ UNIT_ASSERT_VALUES_EQUAL(v[0], 1);
+ UNIT_ASSERT_VALUES_EQUAL(v[1], 2);
+ UNIT_ASSERT_VALUES_EQUAL(v[2], 3);
+ }
+
Y_UNIT_TEST(CleanInDestructor) {
TSimpleSharedPtr<bool> p(new bool);
UNIT_ASSERT_VALUES_EQUAL(1u, p.RefCount());
diff --git a/util/thread/pool.cpp b/util/thread/pool.cpp
index 05fad02e9b..f3c4ecc7bc 100644
--- a/util/thread/pool.cpp
+++ b/util/thread/pool.cpp
@@ -143,10 +143,10 @@ public:
return ThreadCountExpected;
}
- inline size_t GetThreadCountReal() const noexcept {
- return ThreadCountReal;
- }
-
+ inline size_t GetThreadCountReal() const noexcept {
+ return ThreadCountReal;
+ }
+
inline void AtforkAction() noexcept Y_NO_SANITIZE("thread") {
Forked = true;
}
@@ -326,29 +326,29 @@ size_t TThreadPool::Size() const noexcept {
}
size_t TThreadPool::GetThreadCountExpected() const noexcept {
- if (!Impl_.Get()) {
- return 0;
- }
-
- return Impl_->GetThreadCountExpected();
-}
-
+ if (!Impl_.Get()) {
+ return 0;
+ }
+
+ return Impl_->GetThreadCountExpected();
+}
+
size_t TThreadPool::GetThreadCountReal() const noexcept {
- if (!Impl_.Get()) {
- return 0;
- }
-
- return Impl_->GetThreadCountReal();
-}
-
+ if (!Impl_.Get()) {
+ return 0;
+ }
+
+ return Impl_->GetThreadCountReal();
+}
+
size_t TThreadPool::GetMaxQueueSize() const noexcept {
- if (!Impl_.Get()) {
- return 0;
- }
-
- return Impl_->GetMaxQueueSize();
-}
-
+ if (!Impl_.Get()) {
+ return 0;
+ }
+
+ return Impl_->GetMaxQueueSize();
+}
+
bool TThreadPool::Add(IObjectInQueue* obj) {
Y_ENSURE_EX(Impl_.Get(), TThreadPoolException() << TStringBuf("mtp queue not started"));
diff --git a/util/thread/pool.h b/util/thread/pool.h
index d1ea3a67cb..da6a475038 100644
--- a/util/thread/pool.h
+++ b/util/thread/pool.h
@@ -277,9 +277,9 @@ public:
void Start(size_t threadCount, size_t queueSizeLimit = 0) override;
void Stop() noexcept override;
size_t Size() const noexcept override;
- size_t GetThreadCountExpected() const noexcept;
- size_t GetThreadCountReal() const noexcept;
- size_t GetMaxQueueSize() const noexcept;
+ size_t GetThreadCountExpected() const noexcept;
+ size_t GetThreadCountReal() const noexcept;
+ size_t GetMaxQueueSize() const noexcept;
private:
class TImpl;
diff --git a/util/thread/pool_ut.cpp b/util/thread/pool_ut.cpp
index 893770d0c4..b44ea9694c 100644
--- a/util/thread/pool_ut.cpp
+++ b/util/thread/pool_ut.cpp
@@ -158,26 +158,26 @@ Y_UNIT_TEST_SUITE(TThreadPoolTest) {
queue.Stop();
}
-
- Y_UNIT_TEST(TestInfoGetters) {
+
+ Y_UNIT_TEST(TestInfoGetters) {
TThreadPool queue;
-
- queue.Start(2, 7);
-
- UNIT_ASSERT_EQUAL(queue.GetThreadCountExpected(), 2);
- UNIT_ASSERT_EQUAL(queue.GetThreadCountReal(), 2);
- UNIT_ASSERT_EQUAL(queue.GetMaxQueueSize(), 7);
-
- queue.Stop();
-
- queue.Start(4, 1);
-
- UNIT_ASSERT_EQUAL(queue.GetThreadCountExpected(), 4);
- UNIT_ASSERT_EQUAL(queue.GetThreadCountReal(), 4);
- UNIT_ASSERT_EQUAL(queue.GetMaxQueueSize(), 1);
-
- queue.Stop();
- }
+
+ queue.Start(2, 7);
+
+ UNIT_ASSERT_EQUAL(queue.GetThreadCountExpected(), 2);
+ UNIT_ASSERT_EQUAL(queue.GetThreadCountReal(), 2);
+ UNIT_ASSERT_EQUAL(queue.GetMaxQueueSize(), 7);
+
+ queue.Stop();
+
+ queue.Start(4, 1);
+
+ UNIT_ASSERT_EQUAL(queue.GetThreadCountExpected(), 4);
+ UNIT_ASSERT_EQUAL(queue.GetThreadCountReal(), 4);
+ UNIT_ASSERT_EQUAL(queue.GetMaxQueueSize(), 1);
+
+ queue.Stop();
+ }
void TestFixedThreadName(IThreadPool& pool, const TString& expectedName) {
pool.Start(1);