aboutsummaryrefslogtreecommitdiffstats
path: root/util/thread/lfstack_ut.cpp
diff options
context:
space:
mode:
authorDevtools Arcadia <arcadia-devtools@yandex-team.ru>2022-02-07 18:08:42 +0300
committerDevtools Arcadia <arcadia-devtools@mous.vla.yp-c.yandex.net>2022-02-07 18:08:42 +0300
commit1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch)
treee26c9fed0de5d9873cce7e00bc214573dc2195b7 /util/thread/lfstack_ut.cpp
downloadydb-1110808a9d39d4b808aef724c861a2e1a38d2a69.tar.gz
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'util/thread/lfstack_ut.cpp')
-rw-r--r--util/thread/lfstack_ut.cpp346
1 files changed, 346 insertions, 0 deletions
diff --git a/util/thread/lfstack_ut.cpp b/util/thread/lfstack_ut.cpp
new file mode 100644
index 0000000000..e20a838f95
--- /dev/null
+++ b/util/thread/lfstack_ut.cpp
@@ -0,0 +1,346 @@
+
+#include <util/system/atomic.h>
+#include <util/system/event.h>
+#include <util/generic/deque.h>
+#include <library/cpp/threading/future/legacy_future.h>
+
+#include <library/cpp/testing/unittest/registar.h>
+
+#include "lfstack.h"
+
+Y_UNIT_TEST_SUITE(TLockFreeStackTests) {
+ class TCountDownLatch {
+ private:
+ TAtomic Current_;
+ TSystemEvent EventObject_;
+
+ public:
+ TCountDownLatch(unsigned initial)
+ : Current_(initial)
+ {
+ }
+
+ void CountDown() {
+ if (AtomicDecrement(Current_) == 0) {
+ EventObject_.Signal();
+ }
+ }
+
+ void Await() {
+ EventObject_.Wait();
+ }
+
+ bool Await(TDuration timeout) {
+ return EventObject_.WaitT(timeout);
+ }
+ };
+
+ template <bool SingleConsumer>
+ struct TDequeueAllTester {
+ size_t EnqueueThreads;
+ size_t DequeueThreads;
+
+ size_t EnqueuesPerThread;
+ TAtomic LeftToDequeue;
+
+ TCountDownLatch StartLatch;
+ TLockFreeStack<int> Stack;
+
+ TDequeueAllTester()
+ : EnqueueThreads(4)
+ , DequeueThreads(SingleConsumer ? 1 : 3)
+ , EnqueuesPerThread(100000)
+ , LeftToDequeue(EnqueueThreads * EnqueuesPerThread)
+ , StartLatch(EnqueueThreads + DequeueThreads)
+ {
+ }
+
+ void Enqueuer() {
+ StartLatch.CountDown();
+ StartLatch.Await();
+
+ for (size_t i = 0; i < EnqueuesPerThread; ++i) {
+ Stack.Enqueue(i);
+ }
+ }
+
+ void DequeuerAll() {
+ StartLatch.CountDown();
+ StartLatch.Await();
+
+ TVector<int> temp;
+ while (AtomicGet(LeftToDequeue) > 0) {
+ size_t dequeued = 0;
+ for (size_t i = 0; i < 100; ++i) {
+ temp.clear();
+ if (SingleConsumer) {
+ Stack.DequeueAllSingleConsumer(&temp);
+ } else {
+ Stack.DequeueAll(&temp);
+ }
+ dequeued += temp.size();
+ }
+ AtomicAdd(LeftToDequeue, -dequeued);
+ }
+ }
+
+ void Run() {
+ TVector<TSimpleSharedPtr<NThreading::TLegacyFuture<>>> futures;
+
+ for (size_t i = 0; i < EnqueueThreads; ++i) {
+ futures.push_back(new NThreading::TLegacyFuture<>(std::bind(&TDequeueAllTester<SingleConsumer>::Enqueuer, this)));
+ }
+
+ for (size_t i = 0; i < DequeueThreads; ++i) {
+ futures.push_back(new NThreading::TLegacyFuture<>(std::bind(&TDequeueAllTester<SingleConsumer>::DequeuerAll, this)));
+ }
+
+ // effectively join
+ futures.clear();
+
+ UNIT_ASSERT_VALUES_EQUAL(0, int(AtomicGet(LeftToDequeue)));
+
+ TVector<int> left;
+ Stack.DequeueAll(&left);
+ UNIT_ASSERT(left.empty());
+ }
+ };
+
+ Y_UNIT_TEST(TestDequeueAll) {
+ TDequeueAllTester<false>().Run();
+ }
+
+ Y_UNIT_TEST(TestDequeueAllSingleConsumer) {
+ TDequeueAllTester<true>().Run();
+ }
+
+ Y_UNIT_TEST(TestDequeueAllEmptyStack) {
+ TLockFreeStack<int> stack;
+
+ TVector<int> r;
+ stack.DequeueAll(&r);
+
+ UNIT_ASSERT(r.empty());
+ }
+
+ Y_UNIT_TEST(TestDequeueAllReturnsInReverseOrder) {
+ TLockFreeStack<int> stack;
+
+ stack.Enqueue(17);
+ stack.Enqueue(19);
+ stack.Enqueue(23);
+
+ TVector<int> r;
+
+ stack.DequeueAll(&r);
+
+ UNIT_ASSERT_VALUES_EQUAL(size_t(3), r.size());
+ UNIT_ASSERT_VALUES_EQUAL(23, r.at(0));
+ UNIT_ASSERT_VALUES_EQUAL(19, r.at(1));
+ UNIT_ASSERT_VALUES_EQUAL(17, r.at(2));
+ }
+
+ Y_UNIT_TEST(TestEnqueueAll) {
+ TLockFreeStack<int> stack;
+
+ TVector<int> v;
+ TVector<int> expected;
+
+ stack.EnqueueAll(v); // add empty
+
+ v.push_back(2);
+ v.push_back(3);
+ v.push_back(5);
+ expected.insert(expected.end(), v.begin(), v.end());
+ stack.EnqueueAll(v);
+
+ v.clear();
+
+ stack.EnqueueAll(v); // add empty
+
+ v.push_back(7);
+ v.push_back(11);
+ v.push_back(13);
+ v.push_back(17);
+ expected.insert(expected.end(), v.begin(), v.end());
+ stack.EnqueueAll(v);
+
+ TVector<int> actual;
+ stack.DequeueAll(&actual);
+
+ UNIT_ASSERT_VALUES_EQUAL(expected.size(), actual.size());
+ for (size_t i = 0; i < actual.size(); ++i) {
+ UNIT_ASSERT_VALUES_EQUAL(expected.at(expected.size() - i - 1), actual.at(i));
+ }
+ }
+
+ Y_UNIT_TEST(CleanInDestructor) {
+ TSimpleSharedPtr<bool> p(new bool);
+ UNIT_ASSERT_VALUES_EQUAL(1u, p.RefCount());
+
+ {
+ TLockFreeStack<TSimpleSharedPtr<bool>> stack;
+
+ stack.Enqueue(p);
+ stack.Enqueue(p);
+
+ UNIT_ASSERT_VALUES_EQUAL(3u, p.RefCount());
+ }
+
+ UNIT_ASSERT_VALUES_EQUAL(1, p.RefCount());
+ }
+
+ Y_UNIT_TEST(NoCopyTest) {
+ static unsigned copied = 0;
+ struct TCopyCount {
+ TCopyCount(int) {
+ }
+ TCopyCount(const TCopyCount&) {
+ ++copied;
+ }
+
+ TCopyCount(TCopyCount&&) {
+ }
+
+ TCopyCount& operator=(const TCopyCount&) {
+ ++copied;
+ return *this;
+ }
+
+ TCopyCount& operator=(TCopyCount&&) {
+ return *this;
+ }
+ };
+
+ TLockFreeStack<TCopyCount> stack;
+ stack.Enqueue(TCopyCount(1));
+ TCopyCount val(0);
+ stack.Dequeue(&val);
+ UNIT_ASSERT_VALUES_EQUAL(0, copied);
+ }
+
+ Y_UNIT_TEST(MoveOnlyTest) {
+ TLockFreeStack<THolder<bool>> stack;
+ stack.Enqueue(MakeHolder<bool>(true));
+ THolder<bool> val;
+ stack.Dequeue(&val);
+ UNIT_ASSERT(val);
+ UNIT_ASSERT_VALUES_EQUAL(true, *val);
+ }
+
+ template <class TTest>
+ struct TMultiThreadTester {
+ using ThisType = TMultiThreadTester<TTest>;
+
+ size_t Threads;
+ size_t OperationsPerThread;
+
+ TCountDownLatch StartLatch;
+ TLockFreeStack<typename TTest::ValueType> Stack;
+
+ TMultiThreadTester()
+ : Threads(10)
+ , OperationsPerThread(100000)
+ , StartLatch(Threads)
+ {
+ }
+
+ void Worker() {
+ StartLatch.CountDown();
+ StartLatch.Await();
+
+ TVector<typename TTest::ValueType> unused;
+ for (size_t i = 0; i < OperationsPerThread; ++i) {
+ switch (GetCycleCount() % 4) {
+ case 0: {
+ TTest::Enqueue(Stack, i);
+ break;
+ }
+ case 1: {
+ TTest::Dequeue(Stack);
+ break;
+ }
+ case 2: {
+ TTest::EnqueueAll(Stack);
+ break;
+ }
+ case 3: {
+ TTest::DequeueAll(Stack);
+ break;
+ }
+ }
+ }
+ }
+
+ void Run() {
+ TDeque<NThreading::TLegacyFuture<>> futures;
+
+ for (size_t i = 0; i < Threads; ++i) {
+ futures.emplace_back(std::bind(&ThisType::Worker, this));
+ }
+ futures.clear();
+ TTest::DequeueAll(Stack);
+ }
+ };
+
+ struct TFreeListTest {
+ using ValueType = int;
+
+ static void Enqueue(TLockFreeStack<int>& stack, size_t i) {
+ stack.Enqueue(static_cast<int>(i));
+ }
+
+ static void Dequeue(TLockFreeStack<int>& stack) {
+ int value;
+ stack.Dequeue(&value);
+ }
+
+ static void EnqueueAll(TLockFreeStack<int>& stack) {
+ TVector<int> values(5);
+ stack.EnqueueAll(values);
+ }
+
+ static void DequeueAll(TLockFreeStack<int>& stack) {
+ TVector<int> value;
+ stack.DequeueAll(&value);
+ }
+ };
+
+ // Test for catching thread sanitizer problems
+ Y_UNIT_TEST(TestFreeList) {
+ TMultiThreadTester<TFreeListTest>().Run();
+ }
+
+ struct TMoveTest {
+ using ValueType = THolder<int>;
+
+ static void Enqueue(TLockFreeStack<ValueType>& stack, size_t i) {
+ stack.Enqueue(MakeHolder<int>(static_cast<int>(i)));
+ }
+
+ static void Dequeue(TLockFreeStack<ValueType>& stack) {
+ ValueType value;
+ if (stack.Dequeue(&value)) {
+ UNIT_ASSERT(value);
+ }
+ }
+
+ static void EnqueueAll(TLockFreeStack<ValueType>& stack) {
+ // there is no enqueAll with moving signature in LockFreeStack
+ Enqueue(stack, 0);
+ }
+
+ static void DequeueAll(TLockFreeStack<ValueType>& stack) {
+ TVector<ValueType> values;
+ stack.DequeueAll(&values);
+ for (auto& v : values) {
+ UNIT_ASSERT(v);
+ }
+ }
+ };
+
+ // Test for catching thread sanitizer problems
+ Y_UNIT_TEST(TesMultiThreadMove) {
+ TMultiThreadTester<TMoveTest>().Run();
+ }
+}