aboutsummaryrefslogtreecommitdiffstats
path: root/util/thread/lfqueue_ut.cpp
diff options
context:
space:
mode:
authorivanzhukov <ivanzhukov@yandex-team.ru>2022-02-10 16:49:41 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:49:41 +0300
commitca3252a147a429eac4ba8221857493c58dcd09b5 (patch)
tree5d5cb817648f650d76cf1076100726fd9b8448e8 /util/thread/lfqueue_ut.cpp
parent0892d79ab411592ad25175c4bdadbcb09b466cf5 (diff)
downloadydb-ca3252a147a429eac4ba8221857493c58dcd09b5.tar.gz
Restoring authorship annotation for <ivanzhukov@yandex-team.ru>. Commit 2 of 2.
Diffstat (limited to 'util/thread/lfqueue_ut.cpp')
-rw-r--r--util/thread/lfqueue_ut.cpp226
1 files changed, 113 insertions, 113 deletions
diff --git a/util/thread/lfqueue_ut.cpp b/util/thread/lfqueue_ut.cpp
index 96905a4c84..83bca100cf 100644
--- a/util/thread/lfqueue_ut.cpp
+++ b/util/thread/lfqueue_ut.cpp
@@ -1,11 +1,11 @@
#include <library/cpp/threading/future/future.h>
#include <library/cpp/testing/unittest/registar.h>
-#include <util/generic/algorithm.h>
+#include <util/generic/algorithm.h>
#include <util/generic/vector.h>
#include <util/generic/ptr.h>
-#include <util/system/atomic.h>
-#include <util/thread/pool.h>
+#include <util/system/atomic.h>
+#include <util/thread/pool.h>
#include "lfqueue.h"
@@ -191,117 +191,117 @@ Y_UNIT_TEST_SUITE(TLockFreeQueueTests) {
}
void DequeueAllRunner(TLockFreeQueue<int>& queue, bool singleConsumer) {
- size_t threadsNum = 4;
- size_t enqueuesPerThread = 10'000;
- TThreadPool p;
- p.Start(threadsNum, 0);
-
- TVector<NThreading::TFuture<void>> futures;
-
- for (size_t i = 0; i < threadsNum; ++i) {
- NThreading::TPromise<void> promise = NThreading::NewPromise();
- futures.emplace_back(promise.GetFuture());
-
- p.SafeAddFunc([enqueuesPerThread, &queue, promise]() mutable {
- for (size_t i = 0; i != enqueuesPerThread; ++i) {
- queue.Enqueue(i);
- }
-
- promise.SetValue();
- });
- }
-
- TAtomic elementsLeft;
- AtomicSet(elementsLeft, threadsNum * enqueuesPerThread);
-
- ui64 numOfConsumers = singleConsumer ? 1 : threadsNum;
-
- TVector<TVector<int>> dataBuckets(numOfConsumers);
-
- for (size_t i = 0; i < numOfConsumers; ++i) {
- NThreading::TPromise<void> promise = NThreading::NewPromise();
- futures.emplace_back(promise.GetFuture());
-
- p.SafeAddFunc([&queue, &elementsLeft, promise, consumerData{&dataBuckets[i]}]() mutable {
- TVector<int> vec;
- while (static_cast<i64>(AtomicGet(elementsLeft)) > 0) {
- for (size_t i = 0; i != 100; ++i) {
- vec.clear();
- queue.DequeueAll(&vec);
-
- AtomicSub(elementsLeft, vec.size());
- consumerData->insert(consumerData->end(), vec.begin(), vec.end());
- }
- }
-
- promise.SetValue();
- });
- }
-
+ size_t threadsNum = 4;
+ size_t enqueuesPerThread = 10'000;
+ TThreadPool p;
+ p.Start(threadsNum, 0);
+
+ TVector<NThreading::TFuture<void>> futures;
+
+ for (size_t i = 0; i < threadsNum; ++i) {
+ NThreading::TPromise<void> promise = NThreading::NewPromise();
+ futures.emplace_back(promise.GetFuture());
+
+ p.SafeAddFunc([enqueuesPerThread, &queue, promise]() mutable {
+ for (size_t i = 0; i != enqueuesPerThread; ++i) {
+ queue.Enqueue(i);
+ }
+
+ promise.SetValue();
+ });
+ }
+
+ TAtomic elementsLeft;
+ AtomicSet(elementsLeft, threadsNum * enqueuesPerThread);
+
+ ui64 numOfConsumers = singleConsumer ? 1 : threadsNum;
+
+ TVector<TVector<int>> dataBuckets(numOfConsumers);
+
+ for (size_t i = 0; i < numOfConsumers; ++i) {
+ NThreading::TPromise<void> promise = NThreading::NewPromise();
+ futures.emplace_back(promise.GetFuture());
+
+ p.SafeAddFunc([&queue, &elementsLeft, promise, consumerData{&dataBuckets[i]}]() mutable {
+ TVector<int> vec;
+ while (static_cast<i64>(AtomicGet(elementsLeft)) > 0) {
+ for (size_t i = 0; i != 100; ++i) {
+ vec.clear();
+ queue.DequeueAll(&vec);
+
+ AtomicSub(elementsLeft, vec.size());
+ consumerData->insert(consumerData->end(), vec.begin(), vec.end());
+ }
+ }
+
+ promise.SetValue();
+ });
+ }
+
NThreading::WaitExceptionOrAll(futures).GetValueSync();
- p.Stop();
-
- TVector<int> left;
- queue.DequeueAll(&left);
-
- UNIT_ASSERT(left.empty());
-
- TVector<int> data;
- for (auto& dataBucket : dataBuckets) {
- data.insert(data.end(), dataBucket.begin(), dataBucket.end());
- }
-
- UNIT_ASSERT_EQUAL(data.size(), threadsNum * enqueuesPerThread);
-
- size_t threadIdx = 0;
- size_t cntValue = 0;
-
- Sort(data.begin(), data.end());
- for (size_t i = 0; i != data.size(); ++i) {
- UNIT_ASSERT_VALUES_EQUAL(cntValue, data[i]);
- ++threadIdx;
-
- if (threadIdx == threadsNum) {
- ++cntValue;
- threadIdx = 0;
- }
- }
- }
-
- Y_UNIT_TEST(TestDequeueAllSingleConsumer) {
- TLockFreeQueue<int> queue;
- DequeueAllRunner(queue, true);
- }
-
- Y_UNIT_TEST(TestDequeueAllMultipleConsumers) {
- TLockFreeQueue<int> queue;
- DequeueAllRunner(queue, false);
- }
-
- Y_UNIT_TEST(TestDequeueAllEmptyQueue) {
- TLockFreeQueue<int> queue;
- TVector<int> vec;
-
- queue.DequeueAll(&vec);
-
- UNIT_ASSERT(vec.empty());
- }
-
- Y_UNIT_TEST(TestDequeueAllQueueOrder) {
- TLockFreeQueue<int> queue;
- queue.Enqueue(1);
- queue.Enqueue(2);
- queue.Enqueue(3);
-
- TVector<int> v;
- queue.DequeueAll(&v);
-
- UNIT_ASSERT_VALUES_EQUAL(v.size(), 3);
- UNIT_ASSERT_VALUES_EQUAL(v[0], 1);
- UNIT_ASSERT_VALUES_EQUAL(v[1], 2);
- UNIT_ASSERT_VALUES_EQUAL(v[2], 3);
- }
-
+ p.Stop();
+
+ TVector<int> left;
+ queue.DequeueAll(&left);
+
+ UNIT_ASSERT(left.empty());
+
+ TVector<int> data;
+ for (auto& dataBucket : dataBuckets) {
+ data.insert(data.end(), dataBucket.begin(), dataBucket.end());
+ }
+
+ UNIT_ASSERT_EQUAL(data.size(), threadsNum * enqueuesPerThread);
+
+ size_t threadIdx = 0;
+ size_t cntValue = 0;
+
+ Sort(data.begin(), data.end());
+ for (size_t i = 0; i != data.size(); ++i) {
+ UNIT_ASSERT_VALUES_EQUAL(cntValue, data[i]);
+ ++threadIdx;
+
+ if (threadIdx == threadsNum) {
+ ++cntValue;
+ threadIdx = 0;
+ }
+ }
+ }
+
+ Y_UNIT_TEST(TestDequeueAllSingleConsumer) {
+ TLockFreeQueue<int> queue;
+ DequeueAllRunner(queue, true);
+ }
+
+ Y_UNIT_TEST(TestDequeueAllMultipleConsumers) {
+ TLockFreeQueue<int> queue;
+ DequeueAllRunner(queue, false);
+ }
+
+ Y_UNIT_TEST(TestDequeueAllEmptyQueue) {
+ TLockFreeQueue<int> queue;
+ TVector<int> vec;
+
+ queue.DequeueAll(&vec);
+
+ UNIT_ASSERT(vec.empty());
+ }
+
+ Y_UNIT_TEST(TestDequeueAllQueueOrder) {
+ TLockFreeQueue<int> queue;
+ queue.Enqueue(1);
+ queue.Enqueue(2);
+ queue.Enqueue(3);
+
+ TVector<int> v;
+ queue.DequeueAll(&v);
+
+ UNIT_ASSERT_VALUES_EQUAL(v.size(), 3);
+ UNIT_ASSERT_VALUES_EQUAL(v[0], 1);
+ UNIT_ASSERT_VALUES_EQUAL(v[1], 2);
+ UNIT_ASSERT_VALUES_EQUAL(v[2], 3);
+ }
+
Y_UNIT_TEST(CleanInDestructor) {
TSimpleSharedPtr<bool> p(new bool);
UNIT_ASSERT_VALUES_EQUAL(1u, p.RefCount());