aboutsummaryrefslogtreecommitdiffstats
path: root/util/thread/lfqueue_ut.cpp
diff options
context:
space:
mode:
authorDevtools Arcadia <arcadia-devtools@yandex-team.ru>2022-02-07 18:08:42 +0300
committerDevtools Arcadia <arcadia-devtools@mous.vla.yp-c.yandex.net>2022-02-07 18:08:42 +0300
commit1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch)
treee26c9fed0de5d9873cce7e00bc214573dc2195b7 /util/thread/lfqueue_ut.cpp
downloadydb-1110808a9d39d4b808aef724c861a2e1a38d2a69.tar.gz
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'util/thread/lfqueue_ut.cpp')
-rw-r--r--util/thread/lfqueue_ut.cpp333
1 files changed, 333 insertions, 0 deletions
diff --git a/util/thread/lfqueue_ut.cpp b/util/thread/lfqueue_ut.cpp
new file mode 100644
index 0000000000..83bca100cf
--- /dev/null
+++ b/util/thread/lfqueue_ut.cpp
@@ -0,0 +1,333 @@
+#include <library/cpp/threading/future/future.h>
+#include <library/cpp/testing/unittest/registar.h>
+
+#include <util/generic/algorithm.h>
+#include <util/generic/vector.h>
+#include <util/generic/ptr.h>
+#include <util/system/atomic.h>
+#include <util/thread/pool.h>
+
+#include "lfqueue.h"
+
+class TMoveTest {
+public:
+ TMoveTest(int marker = 0, int value = 0)
+ : Marker_(marker)
+ , Value_(value)
+ {
+ }
+
+ TMoveTest(const TMoveTest& other) {
+ *this = other;
+ }
+
+ TMoveTest(TMoveTest&& other) {
+ *this = std::move(other);
+ }
+
+ TMoveTest& operator=(const TMoveTest& other) {
+ Value_ = other.Value_;
+ Marker_ = other.Marker_ + 1024;
+ return *this;
+ }
+
+ TMoveTest& operator=(TMoveTest&& other) {
+ Value_ = other.Value_;
+ Marker_ = other.Marker_;
+ other.Marker_ = 0;
+ return *this;
+ }
+
+ int Marker() const {
+ return Marker_;
+ }
+
+ int Value() const {
+ return Value_;
+ }
+
+private:
+ int Marker_ = 0;
+ int Value_ = 0;
+};
+
+class TOperationsChecker {
+public:
+ TOperationsChecker() {
+ ++DefaultCtor_;
+ }
+
+ TOperationsChecker(TOperationsChecker&&) {
+ ++MoveCtor_;
+ }
+
+ TOperationsChecker(const TOperationsChecker&) {
+ ++CopyCtor_;
+ }
+
+ TOperationsChecker& operator=(TOperationsChecker&&) {
+ ++MoveAssign_;
+ return *this;
+ }
+
+ TOperationsChecker& operator=(const TOperationsChecker&) {
+ ++CopyAssign_;
+ return *this;
+ }
+
+ static void Check(int defaultCtor, int moveCtor, int copyCtor, int moveAssign, int copyAssign) {
+ UNIT_ASSERT_VALUES_EQUAL(defaultCtor, DefaultCtor_);
+ UNIT_ASSERT_VALUES_EQUAL(moveCtor, MoveCtor_);
+ UNIT_ASSERT_VALUES_EQUAL(copyCtor, CopyCtor_);
+ UNIT_ASSERT_VALUES_EQUAL(moveAssign, MoveAssign_);
+ UNIT_ASSERT_VALUES_EQUAL(copyAssign, CopyAssign_);
+ Clear();
+ }
+
+private:
+ static void Clear() {
+ DefaultCtor_ = MoveCtor_ = CopyCtor_ = MoveAssign_ = CopyAssign_ = 0;
+ }
+
+ static int DefaultCtor_;
+ static int MoveCtor_;
+ static int CopyCtor_;
+ static int MoveAssign_;
+ static int CopyAssign_;
+};
+
+int TOperationsChecker::DefaultCtor_ = 0;
+int TOperationsChecker::MoveCtor_ = 0;
+int TOperationsChecker::CopyCtor_ = 0;
+int TOperationsChecker::MoveAssign_ = 0;
+int TOperationsChecker::CopyAssign_ = 0;
+
+Y_UNIT_TEST_SUITE(TLockFreeQueueTests) {
+ Y_UNIT_TEST(TestMoveEnqueue) {
+ TMoveTest value(0xFF, 0xAA);
+ TMoveTest tmp;
+
+ TLockFreeQueue<TMoveTest> queue;
+
+ queue.Enqueue(value);
+ UNIT_ASSERT_VALUES_EQUAL(value.Marker(), 0xFF);
+ UNIT_ASSERT(queue.Dequeue(&tmp));
+ UNIT_ASSERT_VALUES_UNEQUAL(tmp.Marker(), 0xFF);
+ UNIT_ASSERT_VALUES_EQUAL(tmp.Value(), 0xAA);
+
+ queue.Enqueue(std::move(value));
+ UNIT_ASSERT_VALUES_EQUAL(value.Marker(), 0);
+ UNIT_ASSERT(queue.Dequeue(&tmp));
+ UNIT_ASSERT_VALUES_EQUAL(tmp.Value(), 0xAA);
+ }
+
+ Y_UNIT_TEST(TestSimpleEnqueueDequeue) {
+ TLockFreeQueue<int> queue;
+
+ int i = -1;
+
+ UNIT_ASSERT(!queue.Dequeue(&i));
+ UNIT_ASSERT_VALUES_EQUAL(i, -1);
+
+ queue.Enqueue(10);
+ queue.Enqueue(11);
+ queue.Enqueue(12);
+
+ UNIT_ASSERT(queue.Dequeue(&i));
+ UNIT_ASSERT_VALUES_EQUAL(10, i);
+ UNIT_ASSERT(queue.Dequeue(&i));
+ UNIT_ASSERT_VALUES_EQUAL(11, i);
+
+ queue.Enqueue(13);
+
+ UNIT_ASSERT(queue.Dequeue(&i));
+ UNIT_ASSERT_VALUES_EQUAL(12, i);
+ UNIT_ASSERT(queue.Dequeue(&i));
+ UNIT_ASSERT_VALUES_EQUAL(13, i);
+
+ UNIT_ASSERT(!queue.Dequeue(&i));
+
+ const int tmp = 100;
+ queue.Enqueue(tmp);
+ UNIT_ASSERT(queue.Dequeue(&i));
+ UNIT_ASSERT_VALUES_EQUAL(i, tmp);
+ }
+
+ Y_UNIT_TEST(TestSimpleEnqueueAllDequeue) {
+ TLockFreeQueue<int> queue;
+
+ int i = -1;
+
+ UNIT_ASSERT(!queue.Dequeue(&i));
+ UNIT_ASSERT_VALUES_EQUAL(i, -1);
+
+ TVector<int> v;
+ v.push_back(20);
+ v.push_back(21);
+
+ queue.EnqueueAll(v);
+
+ v.clear();
+ v.push_back(22);
+ v.push_back(23);
+ v.push_back(24);
+
+ queue.EnqueueAll(v);
+
+ v.clear();
+ queue.EnqueueAll(v);
+
+ v.clear();
+ v.push_back(25);
+
+ queue.EnqueueAll(v);
+
+ for (int j = 20; j <= 25; ++j) {
+ UNIT_ASSERT(queue.Dequeue(&i));
+ UNIT_ASSERT_VALUES_EQUAL(j, i);
+ }
+
+ UNIT_ASSERT(!queue.Dequeue(&i));
+ }
+
+ void DequeueAllRunner(TLockFreeQueue<int>& queue, bool singleConsumer) {
+ size_t threadsNum = 4;
+ size_t enqueuesPerThread = 10'000;
+ TThreadPool p;
+ p.Start(threadsNum, 0);
+
+ TVector<NThreading::TFuture<void>> futures;
+
+ for (size_t i = 0; i < threadsNum; ++i) {
+ NThreading::TPromise<void> promise = NThreading::NewPromise();
+ futures.emplace_back(promise.GetFuture());
+
+ p.SafeAddFunc([enqueuesPerThread, &queue, promise]() mutable {
+ for (size_t i = 0; i != enqueuesPerThread; ++i) {
+ queue.Enqueue(i);
+ }
+
+ promise.SetValue();
+ });
+ }
+
+ TAtomic elementsLeft;
+ AtomicSet(elementsLeft, threadsNum * enqueuesPerThread);
+
+ ui64 numOfConsumers = singleConsumer ? 1 : threadsNum;
+
+ TVector<TVector<int>> dataBuckets(numOfConsumers);
+
+ for (size_t i = 0; i < numOfConsumers; ++i) {
+ NThreading::TPromise<void> promise = NThreading::NewPromise();
+ futures.emplace_back(promise.GetFuture());
+
+ p.SafeAddFunc([&queue, &elementsLeft, promise, consumerData{&dataBuckets[i]}]() mutable {
+ TVector<int> vec;
+ while (static_cast<i64>(AtomicGet(elementsLeft)) > 0) {
+ for (size_t i = 0; i != 100; ++i) {
+ vec.clear();
+ queue.DequeueAll(&vec);
+
+ AtomicSub(elementsLeft, vec.size());
+ consumerData->insert(consumerData->end(), vec.begin(), vec.end());
+ }
+ }
+
+ promise.SetValue();
+ });
+ }
+
+ NThreading::WaitExceptionOrAll(futures).GetValueSync();
+ p.Stop();
+
+ TVector<int> left;
+ queue.DequeueAll(&left);
+
+ UNIT_ASSERT(left.empty());
+
+ TVector<int> data;
+ for (auto& dataBucket : dataBuckets) {
+ data.insert(data.end(), dataBucket.begin(), dataBucket.end());
+ }
+
+ UNIT_ASSERT_EQUAL(data.size(), threadsNum * enqueuesPerThread);
+
+ size_t threadIdx = 0;
+ size_t cntValue = 0;
+
+ Sort(data.begin(), data.end());
+ for (size_t i = 0; i != data.size(); ++i) {
+ UNIT_ASSERT_VALUES_EQUAL(cntValue, data[i]);
+ ++threadIdx;
+
+ if (threadIdx == threadsNum) {
+ ++cntValue;
+ threadIdx = 0;
+ }
+ }
+ }
+
+ Y_UNIT_TEST(TestDequeueAllSingleConsumer) {
+ TLockFreeQueue<int> queue;
+ DequeueAllRunner(queue, true);
+ }
+
+ Y_UNIT_TEST(TestDequeueAllMultipleConsumers) {
+ TLockFreeQueue<int> queue;
+ DequeueAllRunner(queue, false);
+ }
+
+ Y_UNIT_TEST(TestDequeueAllEmptyQueue) {
+ TLockFreeQueue<int> queue;
+ TVector<int> vec;
+
+ queue.DequeueAll(&vec);
+
+ UNIT_ASSERT(vec.empty());
+ }
+
+ Y_UNIT_TEST(TestDequeueAllQueueOrder) {
+ TLockFreeQueue<int> queue;
+ queue.Enqueue(1);
+ queue.Enqueue(2);
+ queue.Enqueue(3);
+
+ TVector<int> v;
+ queue.DequeueAll(&v);
+
+ UNIT_ASSERT_VALUES_EQUAL(v.size(), 3);
+ UNIT_ASSERT_VALUES_EQUAL(v[0], 1);
+ UNIT_ASSERT_VALUES_EQUAL(v[1], 2);
+ UNIT_ASSERT_VALUES_EQUAL(v[2], 3);
+ }
+
+ Y_UNIT_TEST(CleanInDestructor) {
+ TSimpleSharedPtr<bool> p(new bool);
+ UNIT_ASSERT_VALUES_EQUAL(1u, p.RefCount());
+
+ {
+ TLockFreeQueue<TSimpleSharedPtr<bool>> stack;
+
+ stack.Enqueue(p);
+ stack.Enqueue(p);
+
+ UNIT_ASSERT_VALUES_EQUAL(3u, p.RefCount());
+ }
+
+ UNIT_ASSERT_VALUES_EQUAL(1, p.RefCount());
+ }
+
+ Y_UNIT_TEST(CheckOperationsCount) {
+ TOperationsChecker o;
+ o.Check(1, 0, 0, 0, 0);
+ TLockFreeQueue<TOperationsChecker> queue;
+ o.Check(0, 0, 0, 0, 0);
+ queue.Enqueue(std::move(o));
+ o.Check(0, 1, 0, 0, 0);
+ queue.Enqueue(o);
+ o.Check(0, 0, 1, 0, 0);
+ queue.Dequeue(&o);
+ o.Check(0, 0, 2, 1, 0);
+ }
+}