diff options
author | Devtools Arcadia <arcadia-devtools@yandex-team.ru> | 2022-02-07 18:08:42 +0300 |
---|---|---|
committer | Devtools Arcadia <arcadia-devtools@mous.vla.yp-c.yandex.net> | 2022-02-07 18:08:42 +0300 |
commit | 1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch) | |
tree | e26c9fed0de5d9873cce7e00bc214573dc2195b7 /util/thread/lfstack_ut.cpp | |
download | ydb-1110808a9d39d4b808aef724c861a2e1a38d2a69.tar.gz |
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'util/thread/lfstack_ut.cpp')
-rw-r--r-- | util/thread/lfstack_ut.cpp | 346 |
1 files changed, 346 insertions, 0 deletions
diff --git a/util/thread/lfstack_ut.cpp b/util/thread/lfstack_ut.cpp new file mode 100644 index 0000000000..e20a838f95 --- /dev/null +++ b/util/thread/lfstack_ut.cpp @@ -0,0 +1,346 @@ + +#include <util/system/atomic.h> +#include <util/system/event.h> +#include <util/generic/deque.h> +#include <library/cpp/threading/future/legacy_future.h> + +#include <library/cpp/testing/unittest/registar.h> + +#include "lfstack.h" + +Y_UNIT_TEST_SUITE(TLockFreeStackTests) { + class TCountDownLatch { + private: + TAtomic Current_; + TSystemEvent EventObject_; + + public: + TCountDownLatch(unsigned initial) + : Current_(initial) + { + } + + void CountDown() { + if (AtomicDecrement(Current_) == 0) { + EventObject_.Signal(); + } + } + + void Await() { + EventObject_.Wait(); + } + + bool Await(TDuration timeout) { + return EventObject_.WaitT(timeout); + } + }; + + template <bool SingleConsumer> + struct TDequeueAllTester { + size_t EnqueueThreads; + size_t DequeueThreads; + + size_t EnqueuesPerThread; + TAtomic LeftToDequeue; + + TCountDownLatch StartLatch; + TLockFreeStack<int> Stack; + + TDequeueAllTester() + : EnqueueThreads(4) + , DequeueThreads(SingleConsumer ? 1 : 3) + , EnqueuesPerThread(100000) + , LeftToDequeue(EnqueueThreads * EnqueuesPerThread) + , StartLatch(EnqueueThreads + DequeueThreads) + { + } + + void Enqueuer() { + StartLatch.CountDown(); + StartLatch.Await(); + + for (size_t i = 0; i < EnqueuesPerThread; ++i) { + Stack.Enqueue(i); + } + } + + void DequeuerAll() { + StartLatch.CountDown(); + StartLatch.Await(); + + TVector<int> temp; + while (AtomicGet(LeftToDequeue) > 0) { + size_t dequeued = 0; + for (size_t i = 0; i < 100; ++i) { + temp.clear(); + if (SingleConsumer) { + Stack.DequeueAllSingleConsumer(&temp); + } else { + Stack.DequeueAll(&temp); + } + dequeued += temp.size(); + } + AtomicAdd(LeftToDequeue, -dequeued); + } + } + + void Run() { + TVector<TSimpleSharedPtr<NThreading::TLegacyFuture<>>> futures; + + for (size_t i = 0; i < EnqueueThreads; ++i) { + futures.push_back(new NThreading::TLegacyFuture<>(std::bind(&TDequeueAllTester<SingleConsumer>::Enqueuer, this))); + } + + for (size_t i = 0; i < DequeueThreads; ++i) { + futures.push_back(new NThreading::TLegacyFuture<>(std::bind(&TDequeueAllTester<SingleConsumer>::DequeuerAll, this))); + } + + // effectively join + futures.clear(); + + UNIT_ASSERT_VALUES_EQUAL(0, int(AtomicGet(LeftToDequeue))); + + TVector<int> left; + Stack.DequeueAll(&left); + UNIT_ASSERT(left.empty()); + } + }; + + Y_UNIT_TEST(TestDequeueAll) { + TDequeueAllTester<false>().Run(); + } + + Y_UNIT_TEST(TestDequeueAllSingleConsumer) { + TDequeueAllTester<true>().Run(); + } + + Y_UNIT_TEST(TestDequeueAllEmptyStack) { + TLockFreeStack<int> stack; + + TVector<int> r; + stack.DequeueAll(&r); + + UNIT_ASSERT(r.empty()); + } + + Y_UNIT_TEST(TestDequeueAllReturnsInReverseOrder) { + TLockFreeStack<int> stack; + + stack.Enqueue(17); + stack.Enqueue(19); + stack.Enqueue(23); + + TVector<int> r; + + stack.DequeueAll(&r); + + UNIT_ASSERT_VALUES_EQUAL(size_t(3), r.size()); + UNIT_ASSERT_VALUES_EQUAL(23, r.at(0)); + UNIT_ASSERT_VALUES_EQUAL(19, r.at(1)); + UNIT_ASSERT_VALUES_EQUAL(17, r.at(2)); + } + + Y_UNIT_TEST(TestEnqueueAll) { + TLockFreeStack<int> stack; + + TVector<int> v; + TVector<int> expected; + + stack.EnqueueAll(v); // add empty + + v.push_back(2); + v.push_back(3); + v.push_back(5); + expected.insert(expected.end(), v.begin(), v.end()); + stack.EnqueueAll(v); + + v.clear(); + + stack.EnqueueAll(v); // add empty + + v.push_back(7); + v.push_back(11); + v.push_back(13); + v.push_back(17); + expected.insert(expected.end(), v.begin(), v.end()); + stack.EnqueueAll(v); + + TVector<int> actual; + stack.DequeueAll(&actual); + + UNIT_ASSERT_VALUES_EQUAL(expected.size(), actual.size()); + for (size_t i = 0; i < actual.size(); ++i) { + UNIT_ASSERT_VALUES_EQUAL(expected.at(expected.size() - i - 1), actual.at(i)); + } + } + + Y_UNIT_TEST(CleanInDestructor) { + TSimpleSharedPtr<bool> p(new bool); + UNIT_ASSERT_VALUES_EQUAL(1u, p.RefCount()); + + { + TLockFreeStack<TSimpleSharedPtr<bool>> stack; + + stack.Enqueue(p); + stack.Enqueue(p); + + UNIT_ASSERT_VALUES_EQUAL(3u, p.RefCount()); + } + + UNIT_ASSERT_VALUES_EQUAL(1, p.RefCount()); + } + + Y_UNIT_TEST(NoCopyTest) { + static unsigned copied = 0; + struct TCopyCount { + TCopyCount(int) { + } + TCopyCount(const TCopyCount&) { + ++copied; + } + + TCopyCount(TCopyCount&&) { + } + + TCopyCount& operator=(const TCopyCount&) { + ++copied; + return *this; + } + + TCopyCount& operator=(TCopyCount&&) { + return *this; + } + }; + + TLockFreeStack<TCopyCount> stack; + stack.Enqueue(TCopyCount(1)); + TCopyCount val(0); + stack.Dequeue(&val); + UNIT_ASSERT_VALUES_EQUAL(0, copied); + } + + Y_UNIT_TEST(MoveOnlyTest) { + TLockFreeStack<THolder<bool>> stack; + stack.Enqueue(MakeHolder<bool>(true)); + THolder<bool> val; + stack.Dequeue(&val); + UNIT_ASSERT(val); + UNIT_ASSERT_VALUES_EQUAL(true, *val); + } + + template <class TTest> + struct TMultiThreadTester { + using ThisType = TMultiThreadTester<TTest>; + + size_t Threads; + size_t OperationsPerThread; + + TCountDownLatch StartLatch; + TLockFreeStack<typename TTest::ValueType> Stack; + + TMultiThreadTester() + : Threads(10) + , OperationsPerThread(100000) + , StartLatch(Threads) + { + } + + void Worker() { + StartLatch.CountDown(); + StartLatch.Await(); + + TVector<typename TTest::ValueType> unused; + for (size_t i = 0; i < OperationsPerThread; ++i) { + switch (GetCycleCount() % 4) { + case 0: { + TTest::Enqueue(Stack, i); + break; + } + case 1: { + TTest::Dequeue(Stack); + break; + } + case 2: { + TTest::EnqueueAll(Stack); + break; + } + case 3: { + TTest::DequeueAll(Stack); + break; + } + } + } + } + + void Run() { + TDeque<NThreading::TLegacyFuture<>> futures; + + for (size_t i = 0; i < Threads; ++i) { + futures.emplace_back(std::bind(&ThisType::Worker, this)); + } + futures.clear(); + TTest::DequeueAll(Stack); + } + }; + + struct TFreeListTest { + using ValueType = int; + + static void Enqueue(TLockFreeStack<int>& stack, size_t i) { + stack.Enqueue(static_cast<int>(i)); + } + + static void Dequeue(TLockFreeStack<int>& stack) { + int value; + stack.Dequeue(&value); + } + + static void EnqueueAll(TLockFreeStack<int>& stack) { + TVector<int> values(5); + stack.EnqueueAll(values); + } + + static void DequeueAll(TLockFreeStack<int>& stack) { + TVector<int> value; + stack.DequeueAll(&value); + } + }; + + // Test for catching thread sanitizer problems + Y_UNIT_TEST(TestFreeList) { + TMultiThreadTester<TFreeListTest>().Run(); + } + + struct TMoveTest { + using ValueType = THolder<int>; + + static void Enqueue(TLockFreeStack<ValueType>& stack, size_t i) { + stack.Enqueue(MakeHolder<int>(static_cast<int>(i))); + } + + static void Dequeue(TLockFreeStack<ValueType>& stack) { + ValueType value; + if (stack.Dequeue(&value)) { + UNIT_ASSERT(value); + } + } + + static void EnqueueAll(TLockFreeStack<ValueType>& stack) { + // there is no enqueAll with moving signature in LockFreeStack + Enqueue(stack, 0); + } + + static void DequeueAll(TLockFreeStack<ValueType>& stack) { + TVector<ValueType> values; + stack.DequeueAll(&values); + for (auto& v : values) { + UNIT_ASSERT(v); + } + } + }; + + // Test for catching thread sanitizer problems + Y_UNIT_TEST(TesMultiThreadMove) { + TMultiThreadTester<TMoveTest>().Run(); + } +} |