diff options
author | Devtools Arcadia <arcadia-devtools@yandex-team.ru> | 2022-02-07 18:08:42 +0300 |
---|---|---|
committer | Devtools Arcadia <arcadia-devtools@mous.vla.yp-c.yandex.net> | 2022-02-07 18:08:42 +0300 |
commit | 1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch) | |
tree | e26c9fed0de5d9873cce7e00bc214573dc2195b7 /util/thread/lfqueue_ut.cpp | |
download | ydb-1110808a9d39d4b808aef724c861a2e1a38d2a69.tar.gz |
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'util/thread/lfqueue_ut.cpp')
-rw-r--r-- | util/thread/lfqueue_ut.cpp | 333 |
1 files changed, 333 insertions, 0 deletions
diff --git a/util/thread/lfqueue_ut.cpp b/util/thread/lfqueue_ut.cpp new file mode 100644 index 0000000000..83bca100cf --- /dev/null +++ b/util/thread/lfqueue_ut.cpp @@ -0,0 +1,333 @@ +#include <library/cpp/threading/future/future.h> +#include <library/cpp/testing/unittest/registar.h> + +#include <util/generic/algorithm.h> +#include <util/generic/vector.h> +#include <util/generic/ptr.h> +#include <util/system/atomic.h> +#include <util/thread/pool.h> + +#include "lfqueue.h" + +class TMoveTest { +public: + TMoveTest(int marker = 0, int value = 0) + : Marker_(marker) + , Value_(value) + { + } + + TMoveTest(const TMoveTest& other) { + *this = other; + } + + TMoveTest(TMoveTest&& other) { + *this = std::move(other); + } + + TMoveTest& operator=(const TMoveTest& other) { + Value_ = other.Value_; + Marker_ = other.Marker_ + 1024; + return *this; + } + + TMoveTest& operator=(TMoveTest&& other) { + Value_ = other.Value_; + Marker_ = other.Marker_; + other.Marker_ = 0; + return *this; + } + + int Marker() const { + return Marker_; + } + + int Value() const { + return Value_; + } + +private: + int Marker_ = 0; + int Value_ = 0; +}; + +class TOperationsChecker { +public: + TOperationsChecker() { + ++DefaultCtor_; + } + + TOperationsChecker(TOperationsChecker&&) { + ++MoveCtor_; + } + + TOperationsChecker(const TOperationsChecker&) { + ++CopyCtor_; + } + + TOperationsChecker& operator=(TOperationsChecker&&) { + ++MoveAssign_; + return *this; + } + + TOperationsChecker& operator=(const TOperationsChecker&) { + ++CopyAssign_; + return *this; + } + + static void Check(int defaultCtor, int moveCtor, int copyCtor, int moveAssign, int copyAssign) { + UNIT_ASSERT_VALUES_EQUAL(defaultCtor, DefaultCtor_); + UNIT_ASSERT_VALUES_EQUAL(moveCtor, MoveCtor_); + UNIT_ASSERT_VALUES_EQUAL(copyCtor, CopyCtor_); + UNIT_ASSERT_VALUES_EQUAL(moveAssign, MoveAssign_); + UNIT_ASSERT_VALUES_EQUAL(copyAssign, CopyAssign_); + Clear(); + } + +private: + static void Clear() { + DefaultCtor_ = MoveCtor_ = CopyCtor_ = MoveAssign_ = CopyAssign_ = 0; + } + + static int DefaultCtor_; + static int MoveCtor_; + static int CopyCtor_; + static int MoveAssign_; + static int CopyAssign_; +}; + +int TOperationsChecker::DefaultCtor_ = 0; +int TOperationsChecker::MoveCtor_ = 0; +int TOperationsChecker::CopyCtor_ = 0; +int TOperationsChecker::MoveAssign_ = 0; +int TOperationsChecker::CopyAssign_ = 0; + +Y_UNIT_TEST_SUITE(TLockFreeQueueTests) { + Y_UNIT_TEST(TestMoveEnqueue) { + TMoveTest value(0xFF, 0xAA); + TMoveTest tmp; + + TLockFreeQueue<TMoveTest> queue; + + queue.Enqueue(value); + UNIT_ASSERT_VALUES_EQUAL(value.Marker(), 0xFF); + UNIT_ASSERT(queue.Dequeue(&tmp)); + UNIT_ASSERT_VALUES_UNEQUAL(tmp.Marker(), 0xFF); + UNIT_ASSERT_VALUES_EQUAL(tmp.Value(), 0xAA); + + queue.Enqueue(std::move(value)); + UNIT_ASSERT_VALUES_EQUAL(value.Marker(), 0); + UNIT_ASSERT(queue.Dequeue(&tmp)); + UNIT_ASSERT_VALUES_EQUAL(tmp.Value(), 0xAA); + } + + Y_UNIT_TEST(TestSimpleEnqueueDequeue) { + TLockFreeQueue<int> queue; + + int i = -1; + + UNIT_ASSERT(!queue.Dequeue(&i)); + UNIT_ASSERT_VALUES_EQUAL(i, -1); + + queue.Enqueue(10); + queue.Enqueue(11); + queue.Enqueue(12); + + UNIT_ASSERT(queue.Dequeue(&i)); + UNIT_ASSERT_VALUES_EQUAL(10, i); + UNIT_ASSERT(queue.Dequeue(&i)); + UNIT_ASSERT_VALUES_EQUAL(11, i); + + queue.Enqueue(13); + + UNIT_ASSERT(queue.Dequeue(&i)); + UNIT_ASSERT_VALUES_EQUAL(12, i); + UNIT_ASSERT(queue.Dequeue(&i)); + UNIT_ASSERT_VALUES_EQUAL(13, i); + + UNIT_ASSERT(!queue.Dequeue(&i)); + + const int tmp = 100; + queue.Enqueue(tmp); + UNIT_ASSERT(queue.Dequeue(&i)); + UNIT_ASSERT_VALUES_EQUAL(i, tmp); + } + + Y_UNIT_TEST(TestSimpleEnqueueAllDequeue) { + TLockFreeQueue<int> queue; + + int i = -1; + + UNIT_ASSERT(!queue.Dequeue(&i)); + UNIT_ASSERT_VALUES_EQUAL(i, -1); + + TVector<int> v; + v.push_back(20); + v.push_back(21); + + queue.EnqueueAll(v); + + v.clear(); + v.push_back(22); + v.push_back(23); + v.push_back(24); + + queue.EnqueueAll(v); + + v.clear(); + queue.EnqueueAll(v); + + v.clear(); + v.push_back(25); + + queue.EnqueueAll(v); + + for (int j = 20; j <= 25; ++j) { + UNIT_ASSERT(queue.Dequeue(&i)); + UNIT_ASSERT_VALUES_EQUAL(j, i); + } + + UNIT_ASSERT(!queue.Dequeue(&i)); + } + + void DequeueAllRunner(TLockFreeQueue<int>& queue, bool singleConsumer) { + size_t threadsNum = 4; + size_t enqueuesPerThread = 10'000; + TThreadPool p; + p.Start(threadsNum, 0); + + TVector<NThreading::TFuture<void>> futures; + + for (size_t i = 0; i < threadsNum; ++i) { + NThreading::TPromise<void> promise = NThreading::NewPromise(); + futures.emplace_back(promise.GetFuture()); + + p.SafeAddFunc([enqueuesPerThread, &queue, promise]() mutable { + for (size_t i = 0; i != enqueuesPerThread; ++i) { + queue.Enqueue(i); + } + + promise.SetValue(); + }); + } + + TAtomic elementsLeft; + AtomicSet(elementsLeft, threadsNum * enqueuesPerThread); + + ui64 numOfConsumers = singleConsumer ? 1 : threadsNum; + + TVector<TVector<int>> dataBuckets(numOfConsumers); + + for (size_t i = 0; i < numOfConsumers; ++i) { + NThreading::TPromise<void> promise = NThreading::NewPromise(); + futures.emplace_back(promise.GetFuture()); + + p.SafeAddFunc([&queue, &elementsLeft, promise, consumerData{&dataBuckets[i]}]() mutable { + TVector<int> vec; + while (static_cast<i64>(AtomicGet(elementsLeft)) > 0) { + for (size_t i = 0; i != 100; ++i) { + vec.clear(); + queue.DequeueAll(&vec); + + AtomicSub(elementsLeft, vec.size()); + consumerData->insert(consumerData->end(), vec.begin(), vec.end()); + } + } + + promise.SetValue(); + }); + } + + NThreading::WaitExceptionOrAll(futures).GetValueSync(); + p.Stop(); + + TVector<int> left; + queue.DequeueAll(&left); + + UNIT_ASSERT(left.empty()); + + TVector<int> data; + for (auto& dataBucket : dataBuckets) { + data.insert(data.end(), dataBucket.begin(), dataBucket.end()); + } + + UNIT_ASSERT_EQUAL(data.size(), threadsNum * enqueuesPerThread); + + size_t threadIdx = 0; + size_t cntValue = 0; + + Sort(data.begin(), data.end()); + for (size_t i = 0; i != data.size(); ++i) { + UNIT_ASSERT_VALUES_EQUAL(cntValue, data[i]); + ++threadIdx; + + if (threadIdx == threadsNum) { + ++cntValue; + threadIdx = 0; + } + } + } + + Y_UNIT_TEST(TestDequeueAllSingleConsumer) { + TLockFreeQueue<int> queue; + DequeueAllRunner(queue, true); + } + + Y_UNIT_TEST(TestDequeueAllMultipleConsumers) { + TLockFreeQueue<int> queue; + DequeueAllRunner(queue, false); + } + + Y_UNIT_TEST(TestDequeueAllEmptyQueue) { + TLockFreeQueue<int> queue; + TVector<int> vec; + + queue.DequeueAll(&vec); + + UNIT_ASSERT(vec.empty()); + } + + Y_UNIT_TEST(TestDequeueAllQueueOrder) { + TLockFreeQueue<int> queue; + queue.Enqueue(1); + queue.Enqueue(2); + queue.Enqueue(3); + + TVector<int> v; + queue.DequeueAll(&v); + + UNIT_ASSERT_VALUES_EQUAL(v.size(), 3); + UNIT_ASSERT_VALUES_EQUAL(v[0], 1); + UNIT_ASSERT_VALUES_EQUAL(v[1], 2); + UNIT_ASSERT_VALUES_EQUAL(v[2], 3); + } + + Y_UNIT_TEST(CleanInDestructor) { + TSimpleSharedPtr<bool> p(new bool); + UNIT_ASSERT_VALUES_EQUAL(1u, p.RefCount()); + + { + TLockFreeQueue<TSimpleSharedPtr<bool>> stack; + + stack.Enqueue(p); + stack.Enqueue(p); + + UNIT_ASSERT_VALUES_EQUAL(3u, p.RefCount()); + } + + UNIT_ASSERT_VALUES_EQUAL(1, p.RefCount()); + } + + Y_UNIT_TEST(CheckOperationsCount) { + TOperationsChecker o; + o.Check(1, 0, 0, 0, 0); + TLockFreeQueue<TOperationsChecker> queue; + o.Check(0, 0, 0, 0, 0); + queue.Enqueue(std::move(o)); + o.Check(0, 1, 0, 0, 0); + queue.Enqueue(o); + o.Check(0, 0, 1, 0, 0); + queue.Dequeue(&o); + o.Check(0, 0, 2, 1, 0); + } +} |