diff options
author | ivanzhukov <ivanzhukov@yandex-team.ru> | 2022-02-10 16:49:40 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:49:40 +0300 |
commit | 0892d79ab411592ad25175c4bdadbcb09b466cf5 (patch) | |
tree | 98dfdd45463c9bd747101748a9ca25d2917390fd /util | |
parent | 1b7466cb957659079ebebbb5d76e64e51f3306f0 (diff) | |
download | ydb-0892d79ab411592ad25175c4bdadbcb09b466cf5.tar.gz |
Restoring authorship annotation for <ivanzhukov@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'util')
-rw-r--r-- | util/generic/typetraits.h | 18 | ||||
-rw-r--r-- | util/thread/lfqueue.h | 124 | ||||
-rw-r--r-- | util/thread/lfqueue_ut.cpp | 226 | ||||
-rw-r--r-- | util/thread/pool.cpp | 50 | ||||
-rw-r--r-- | util/thread/pool.h | 6 | ||||
-rw-r--r-- | util/thread/pool_ut.cpp | 38 |
6 files changed, 231 insertions, 231 deletions
diff --git a/util/generic/typetraits.h b/util/generic/typetraits.h index d165bd1a06..493813c943 100644 --- a/util/generic/typetraits.h +++ b/util/generic/typetraits.h @@ -268,18 +268,18 @@ struct TIsSpecializationOf: std::false_type {}; template <template <class...> class T, class... Ts> struct TIsSpecializationOf<T, T<Ts...>>: std::true_type {}; - -/* + +/* * TDependentFalse is a constant dependent on a template parameter. * Use it in static_assert in a false branch of if constexpr to produce a compile error. - * See an example with dependent_false at https://en.cppreference.com/w/cpp/language/if - * - * if constexpr (std::is_same<T, someType1>) { - * } else if constexpr (std::is_same<T, someType2>) { - * } else { + * See an example with dependent_false at https://en.cppreference.com/w/cpp/language/if + * + * if constexpr (std::is_same<T, someType1>) { + * } else if constexpr (std::is_same<T, someType2>) { + * } else { * static_assert(TDependentFalse<T>, "unknown type"); - * } - */ + * } + */ template <typename... T> constexpr bool TDependentFalse = false; diff --git a/util/thread/lfqueue.h b/util/thread/lfqueue.h index ab523631e4..c62596d586 100644 --- a/util/thread/lfqueue.h +++ b/util/thread/lfqueue.h @@ -64,7 +64,7 @@ class TLockFreeQueue: public TNonCopyable { static void EraseList(TListNode* n) { while (n) { - TListNode* keepNext = AtomicGet(n->Next); + TListNode* keepNext = AtomicGet(n->Next); delete n; n = keepNext; } @@ -151,7 +151,7 @@ class TLockFreeQueue: public TNonCopyable { while (ptr) { if (ptr == PrevFirst) { // short cut, we have copied this part already - AtomicSet(Tail->Next, newCopy); + AtomicSet(Tail->Next, newCopy); newCopy = Copy; Copy = nullptr; // do not destroy prev try if (!newTail) @@ -160,7 +160,7 @@ class TLockFreeQueue: public TNonCopyable { } TListNode* newElem = new TListNode(ptr->Data, newCopy); newCopy = newElem; - ptr = AtomicGet(ptr->Next); + ptr = AtomicGet(ptr->Next); if (!newTail) newTail = newElem; } @@ -177,11 +177,11 @@ class TLockFreeQueue: public TNonCopyable { AtomicSet(newRoot->PushQueue, head); for (;;) { TRootNode* curRoot = AtomicGet(JobQueue); - AtomicSet(tail->Next, AtomicGet(curRoot->PushQueue)); + AtomicSet(tail->Next, AtomicGet(curRoot->PushQueue)); AtomicSet(newRoot->PopQueue, AtomicGet(curRoot->PopQueue)); newRoot->CopyCounter(curRoot); - for (TListNode* node = head;; node = AtomicGet(node->Next)) { + for (TListNode* node = head;; node = AtomicGet(node->Next)) { newRoot->IncCount(node->Data); if (node == tail) break; @@ -194,36 +194,36 @@ class TLockFreeQueue: public TNonCopyable { } } - template <typename TCollection> - static void FillCollection(TListNode* lst, TCollection* res) { - while (lst) { - res->emplace_back(std::move(lst->Data)); - lst = AtomicGet(lst->Next); - } - } - - /** Traverses a given list simultaneously creating its inversed version. - * After that, fills a collection with a reversed version and returns the last visited lst's node. - */ - template <typename TCollection> - static TListNode* FillCollectionReverse(TListNode* lst, TCollection* res) { - if (!lst) { - return nullptr; - } - - TListNode* newCopy = nullptr; - do { - TListNode* newElem = new TListNode(std::move(lst->Data), newCopy); - newCopy = newElem; - lst = AtomicGet(lst->Next); - } while (lst); - - FillCollection(newCopy, res); - EraseList(newCopy); - - return lst; - } - + template <typename TCollection> + static void FillCollection(TListNode* lst, TCollection* res) { + while (lst) { + res->emplace_back(std::move(lst->Data)); + lst = AtomicGet(lst->Next); + } + } + + /** Traverses a given list simultaneously creating its inversed version. + * After that, fills a collection with a reversed version and returns the last visited lst's node. + */ + template <typename TCollection> + static TListNode* FillCollectionReverse(TListNode* lst, TCollection* res) { + if (!lst) { + return nullptr; + } + + TListNode* newCopy = nullptr; + do { + TListNode* newElem = new TListNode(std::move(lst->Data), newCopy); + newCopy = newElem; + lst = AtomicGet(lst->Next); + } while (lst); + + FillCollection(newCopy, res); + EraseList(newCopy); + + return lst; + } + public: TLockFreeQueue() : JobQueue(new TRootNode) @@ -284,13 +284,13 @@ public: newRoot = new TRootNode; AtomicSet(newRoot->PushQueue, AtomicGet(curRoot->PushQueue)); - AtomicSet(newRoot->PopQueue, AtomicGet(tail->Next)); + AtomicSet(newRoot->PopQueue, AtomicGet(tail->Next)); newRoot->CopyCounter(curRoot); newRoot->DecCount(tail->Data); Y_ASSERT(AtomicGet(curRoot->PopQueue) == tail); if (AtomicCas(&JobQueue, newRoot, curRoot)) { *data = std::move(tail->Data); - AtomicSet(tail->Next, nullptr); + AtomicSet(tail->Next, nullptr); AsyncUnref(curRoot, tail); return true; } @@ -318,31 +318,31 @@ public: } } } - template <typename TCollection> - void DequeueAll(TCollection* res) { - AsyncRef(); - - TRootNode* newRoot = new TRootNode; - TRootNode* curRoot; - do { - curRoot = AtomicGet(JobQueue); - } while (!AtomicCas(&JobQueue, newRoot, curRoot)); - - FillCollection(curRoot->PopQueue, res); - - TListNode* toDeleteHead = curRoot->PushQueue; - TListNode* toDeleteTail = FillCollectionReverse(curRoot->PushQueue, res); - AtomicSet(curRoot->PushQueue, nullptr); - - if (toDeleteTail) { - toDeleteTail->Next = curRoot->PopQueue; - } else { - toDeleteTail = curRoot->PopQueue; - } - AtomicSet(curRoot->PopQueue, nullptr); - - AsyncUnref(curRoot, toDeleteHead); - } + template <typename TCollection> + void DequeueAll(TCollection* res) { + AsyncRef(); + + TRootNode* newRoot = new TRootNode; + TRootNode* curRoot; + do { + curRoot = AtomicGet(JobQueue); + } while (!AtomicCas(&JobQueue, newRoot, curRoot)); + + FillCollection(curRoot->PopQueue, res); + + TListNode* toDeleteHead = curRoot->PushQueue; + TListNode* toDeleteTail = FillCollectionReverse(curRoot->PushQueue, res); + AtomicSet(curRoot->PushQueue, nullptr); + + if (toDeleteTail) { + toDeleteTail->Next = curRoot->PopQueue; + } else { + toDeleteTail = curRoot->PopQueue; + } + AtomicSet(curRoot->PopQueue, nullptr); + + AsyncUnref(curRoot, toDeleteHead); + } bool IsEmpty() { AsyncRef(); TRootNode* curRoot = AtomicGet(JobQueue); diff --git a/util/thread/lfqueue_ut.cpp b/util/thread/lfqueue_ut.cpp index 83bca100cf..96905a4c84 100644 --- a/util/thread/lfqueue_ut.cpp +++ b/util/thread/lfqueue_ut.cpp @@ -1,11 +1,11 @@ #include <library/cpp/threading/future/future.h> #include <library/cpp/testing/unittest/registar.h> -#include <util/generic/algorithm.h> +#include <util/generic/algorithm.h> #include <util/generic/vector.h> #include <util/generic/ptr.h> -#include <util/system/atomic.h> -#include <util/thread/pool.h> +#include <util/system/atomic.h> +#include <util/thread/pool.h> #include "lfqueue.h" @@ -191,117 +191,117 @@ Y_UNIT_TEST_SUITE(TLockFreeQueueTests) { } void DequeueAllRunner(TLockFreeQueue<int>& queue, bool singleConsumer) { - size_t threadsNum = 4; - size_t enqueuesPerThread = 10'000; - TThreadPool p; - p.Start(threadsNum, 0); - - TVector<NThreading::TFuture<void>> futures; - - for (size_t i = 0; i < threadsNum; ++i) { - NThreading::TPromise<void> promise = NThreading::NewPromise(); - futures.emplace_back(promise.GetFuture()); - - p.SafeAddFunc([enqueuesPerThread, &queue, promise]() mutable { - for (size_t i = 0; i != enqueuesPerThread; ++i) { - queue.Enqueue(i); - } - - promise.SetValue(); - }); - } - - TAtomic elementsLeft; - AtomicSet(elementsLeft, threadsNum * enqueuesPerThread); - - ui64 numOfConsumers = singleConsumer ? 1 : threadsNum; - - TVector<TVector<int>> dataBuckets(numOfConsumers); - - for (size_t i = 0; i < numOfConsumers; ++i) { - NThreading::TPromise<void> promise = NThreading::NewPromise(); - futures.emplace_back(promise.GetFuture()); - - p.SafeAddFunc([&queue, &elementsLeft, promise, consumerData{&dataBuckets[i]}]() mutable { - TVector<int> vec; - while (static_cast<i64>(AtomicGet(elementsLeft)) > 0) { - for (size_t i = 0; i != 100; ++i) { - vec.clear(); - queue.DequeueAll(&vec); - - AtomicSub(elementsLeft, vec.size()); - consumerData->insert(consumerData->end(), vec.begin(), vec.end()); - } - } - - promise.SetValue(); - }); - } - + size_t threadsNum = 4; + size_t enqueuesPerThread = 10'000; + TThreadPool p; + p.Start(threadsNum, 0); + + TVector<NThreading::TFuture<void>> futures; + + for (size_t i = 0; i < threadsNum; ++i) { + NThreading::TPromise<void> promise = NThreading::NewPromise(); + futures.emplace_back(promise.GetFuture()); + + p.SafeAddFunc([enqueuesPerThread, &queue, promise]() mutable { + for (size_t i = 0; i != enqueuesPerThread; ++i) { + queue.Enqueue(i); + } + + promise.SetValue(); + }); + } + + TAtomic elementsLeft; + AtomicSet(elementsLeft, threadsNum * enqueuesPerThread); + + ui64 numOfConsumers = singleConsumer ? 1 : threadsNum; + + TVector<TVector<int>> dataBuckets(numOfConsumers); + + for (size_t i = 0; i < numOfConsumers; ++i) { + NThreading::TPromise<void> promise = NThreading::NewPromise(); + futures.emplace_back(promise.GetFuture()); + + p.SafeAddFunc([&queue, &elementsLeft, promise, consumerData{&dataBuckets[i]}]() mutable { + TVector<int> vec; + while (static_cast<i64>(AtomicGet(elementsLeft)) > 0) { + for (size_t i = 0; i != 100; ++i) { + vec.clear(); + queue.DequeueAll(&vec); + + AtomicSub(elementsLeft, vec.size()); + consumerData->insert(consumerData->end(), vec.begin(), vec.end()); + } + } + + promise.SetValue(); + }); + } + NThreading::WaitExceptionOrAll(futures).GetValueSync(); - p.Stop(); - - TVector<int> left; - queue.DequeueAll(&left); - - UNIT_ASSERT(left.empty()); - - TVector<int> data; - for (auto& dataBucket : dataBuckets) { - data.insert(data.end(), dataBucket.begin(), dataBucket.end()); - } - - UNIT_ASSERT_EQUAL(data.size(), threadsNum * enqueuesPerThread); - - size_t threadIdx = 0; - size_t cntValue = 0; - - Sort(data.begin(), data.end()); - for (size_t i = 0; i != data.size(); ++i) { - UNIT_ASSERT_VALUES_EQUAL(cntValue, data[i]); - ++threadIdx; - - if (threadIdx == threadsNum) { - ++cntValue; - threadIdx = 0; - } - } - } - - Y_UNIT_TEST(TestDequeueAllSingleConsumer) { - TLockFreeQueue<int> queue; - DequeueAllRunner(queue, true); - } - - Y_UNIT_TEST(TestDequeueAllMultipleConsumers) { - TLockFreeQueue<int> queue; - DequeueAllRunner(queue, false); - } - - Y_UNIT_TEST(TestDequeueAllEmptyQueue) { - TLockFreeQueue<int> queue; - TVector<int> vec; - - queue.DequeueAll(&vec); - - UNIT_ASSERT(vec.empty()); - } - - Y_UNIT_TEST(TestDequeueAllQueueOrder) { - TLockFreeQueue<int> queue; - queue.Enqueue(1); - queue.Enqueue(2); - queue.Enqueue(3); - - TVector<int> v; - queue.DequeueAll(&v); - - UNIT_ASSERT_VALUES_EQUAL(v.size(), 3); - UNIT_ASSERT_VALUES_EQUAL(v[0], 1); - UNIT_ASSERT_VALUES_EQUAL(v[1], 2); - UNIT_ASSERT_VALUES_EQUAL(v[2], 3); - } - + p.Stop(); + + TVector<int> left; + queue.DequeueAll(&left); + + UNIT_ASSERT(left.empty()); + + TVector<int> data; + for (auto& dataBucket : dataBuckets) { + data.insert(data.end(), dataBucket.begin(), dataBucket.end()); + } + + UNIT_ASSERT_EQUAL(data.size(), threadsNum * enqueuesPerThread); + + size_t threadIdx = 0; + size_t cntValue = 0; + + Sort(data.begin(), data.end()); + for (size_t i = 0; i != data.size(); ++i) { + UNIT_ASSERT_VALUES_EQUAL(cntValue, data[i]); + ++threadIdx; + + if (threadIdx == threadsNum) { + ++cntValue; + threadIdx = 0; + } + } + } + + Y_UNIT_TEST(TestDequeueAllSingleConsumer) { + TLockFreeQueue<int> queue; + DequeueAllRunner(queue, true); + } + + Y_UNIT_TEST(TestDequeueAllMultipleConsumers) { + TLockFreeQueue<int> queue; + DequeueAllRunner(queue, false); + } + + Y_UNIT_TEST(TestDequeueAllEmptyQueue) { + TLockFreeQueue<int> queue; + TVector<int> vec; + + queue.DequeueAll(&vec); + + UNIT_ASSERT(vec.empty()); + } + + Y_UNIT_TEST(TestDequeueAllQueueOrder) { + TLockFreeQueue<int> queue; + queue.Enqueue(1); + queue.Enqueue(2); + queue.Enqueue(3); + + TVector<int> v; + queue.DequeueAll(&v); + + UNIT_ASSERT_VALUES_EQUAL(v.size(), 3); + UNIT_ASSERT_VALUES_EQUAL(v[0], 1); + UNIT_ASSERT_VALUES_EQUAL(v[1], 2); + UNIT_ASSERT_VALUES_EQUAL(v[2], 3); + } + Y_UNIT_TEST(CleanInDestructor) { TSimpleSharedPtr<bool> p(new bool); UNIT_ASSERT_VALUES_EQUAL(1u, p.RefCount()); diff --git a/util/thread/pool.cpp b/util/thread/pool.cpp index 05fad02e9b..f3c4ecc7bc 100644 --- a/util/thread/pool.cpp +++ b/util/thread/pool.cpp @@ -143,10 +143,10 @@ public: return ThreadCountExpected; } - inline size_t GetThreadCountReal() const noexcept { - return ThreadCountReal; - } - + inline size_t GetThreadCountReal() const noexcept { + return ThreadCountReal; + } + inline void AtforkAction() noexcept Y_NO_SANITIZE("thread") { Forked = true; } @@ -326,29 +326,29 @@ size_t TThreadPool::Size() const noexcept { } size_t TThreadPool::GetThreadCountExpected() const noexcept { - if (!Impl_.Get()) { - return 0; - } - - return Impl_->GetThreadCountExpected(); -} - + if (!Impl_.Get()) { + return 0; + } + + return Impl_->GetThreadCountExpected(); +} + size_t TThreadPool::GetThreadCountReal() const noexcept { - if (!Impl_.Get()) { - return 0; - } - - return Impl_->GetThreadCountReal(); -} - + if (!Impl_.Get()) { + return 0; + } + + return Impl_->GetThreadCountReal(); +} + size_t TThreadPool::GetMaxQueueSize() const noexcept { - if (!Impl_.Get()) { - return 0; - } - - return Impl_->GetMaxQueueSize(); -} - + if (!Impl_.Get()) { + return 0; + } + + return Impl_->GetMaxQueueSize(); +} + bool TThreadPool::Add(IObjectInQueue* obj) { Y_ENSURE_EX(Impl_.Get(), TThreadPoolException() << TStringBuf("mtp queue not started")); diff --git a/util/thread/pool.h b/util/thread/pool.h index d1ea3a67cb..da6a475038 100644 --- a/util/thread/pool.h +++ b/util/thread/pool.h @@ -277,9 +277,9 @@ public: void Start(size_t threadCount, size_t queueSizeLimit = 0) override; void Stop() noexcept override; size_t Size() const noexcept override; - size_t GetThreadCountExpected() const noexcept; - size_t GetThreadCountReal() const noexcept; - size_t GetMaxQueueSize() const noexcept; + size_t GetThreadCountExpected() const noexcept; + size_t GetThreadCountReal() const noexcept; + size_t GetMaxQueueSize() const noexcept; private: class TImpl; diff --git a/util/thread/pool_ut.cpp b/util/thread/pool_ut.cpp index 893770d0c4..b44ea9694c 100644 --- a/util/thread/pool_ut.cpp +++ b/util/thread/pool_ut.cpp @@ -158,26 +158,26 @@ Y_UNIT_TEST_SUITE(TThreadPoolTest) { queue.Stop(); } - - Y_UNIT_TEST(TestInfoGetters) { + + Y_UNIT_TEST(TestInfoGetters) { TThreadPool queue; - - queue.Start(2, 7); - - UNIT_ASSERT_EQUAL(queue.GetThreadCountExpected(), 2); - UNIT_ASSERT_EQUAL(queue.GetThreadCountReal(), 2); - UNIT_ASSERT_EQUAL(queue.GetMaxQueueSize(), 7); - - queue.Stop(); - - queue.Start(4, 1); - - UNIT_ASSERT_EQUAL(queue.GetThreadCountExpected(), 4); - UNIT_ASSERT_EQUAL(queue.GetThreadCountReal(), 4); - UNIT_ASSERT_EQUAL(queue.GetMaxQueueSize(), 1); - - queue.Stop(); - } + + queue.Start(2, 7); + + UNIT_ASSERT_EQUAL(queue.GetThreadCountExpected(), 2); + UNIT_ASSERT_EQUAL(queue.GetThreadCountReal(), 2); + UNIT_ASSERT_EQUAL(queue.GetMaxQueueSize(), 7); + + queue.Stop(); + + queue.Start(4, 1); + + UNIT_ASSERT_EQUAL(queue.GetThreadCountExpected(), 4); + UNIT_ASSERT_EQUAL(queue.GetThreadCountReal(), 4); + UNIT_ASSERT_EQUAL(queue.GetMaxQueueSize(), 1); + + queue.Stop(); + } void TestFixedThreadName(IThreadPool& pool, const TString& expectedName) { pool.Start(1); |