aboutsummaryrefslogblamecommitdiffstats
path: root/util/thread/lfstack_ut.cpp
blob: e0527aa50122dede030a5af8fbea3104747fc5b3 (plain) (tree)
1
2
3
4
5
6
7
8
9
10
 
                    
 
                                                  
                                                       
 


                               
 
                                        
                           
                                     
                                  
 
                                         
                               
         
                          
                                  
                                      


                      
                                

                                       
                                               







                                  
                                          
























                                                               
                              
                                              








                                                              
                                          


                    
                                                                           
                                                         
                                                                                                                                  

                                                         
                                                                                                                                     



                               
                                                                   
 
                              



                                      
                                 

                                         
                                               

                                        
                                           
                                  
                       



                               
                                                      




                                  
                       







                                                      
                                 
                                  
                              


















                                                            
                            






                                                                                         
                                    
                                           

                                                   
                                                         







                                                       
 

                                   



                                           
 
                                      





























                                                      


                                   
                                                        
 
                            








                                         
                                                      

                                                              
                                                 

                              
                                              

                              
                                                 

                              
                                                 








                                                        
                                                                         
                            







                                                                   
         












                                                            
         

                                                  
                               
                                                  
     






























                                                                          
     
 
#include "lfstack.h"

#include <library/cpp/testing/unittest/registar.h>
#include <library/cpp/threading/future/legacy_future.h>

#include <util/generic/deque.h>
#include <util/system/event.h>

#include <atomic>

Y_UNIT_TEST_SUITE(TLockFreeStackTests) {
    class TCountDownLatch {
    private:
        std::atomic<size_t> Current_;
        TSystemEvent EventObject_;

    public:
        TCountDownLatch(unsigned initial)
            : Current_(initial)
        {
        }

        void CountDown() {
            if (--Current_ == 0) {
                EventObject_.Signal();
            }
        }

        void Await() {
            EventObject_.Wait();
        }

        bool Await(TDuration timeout) {
            return EventObject_.WaitT(timeout);
        }
    };

    template <bool SingleConsumer>
    struct TDequeueAllTester {
        size_t EnqueueThreads;
        size_t DequeueThreads;

        size_t EnqueuesPerThread;
        std::atomic<size_t> LeftToDequeue;

        TCountDownLatch StartLatch;
        TLockFreeStack<int> Stack;

        TDequeueAllTester()
            : EnqueueThreads(4)
            , DequeueThreads(SingleConsumer ? 1 : 3)
            , EnqueuesPerThread(100000)
            , LeftToDequeue(EnqueueThreads * EnqueuesPerThread)
            , StartLatch(EnqueueThreads + DequeueThreads)
        {
        }

        void Enqueuer() {
            StartLatch.CountDown();
            StartLatch.Await();

            for (size_t i = 0; i < EnqueuesPerThread; ++i) {
                Stack.Enqueue(i);
            }
        }

        void DequeuerAll() {
            StartLatch.CountDown();
            StartLatch.Await();

            TVector<int> temp;
            while (LeftToDequeue.load() > 0) {
                size_t dequeued = 0;
                for (size_t i = 0; i < 100; ++i) {
                    temp.clear();
                    if (SingleConsumer) {
                        Stack.DequeueAllSingleConsumer(&temp);
                    } else {
                        Stack.DequeueAll(&temp);
                    }
                    dequeued += temp.size();
                }
                LeftToDequeue -= dequeued;
            }
        }

        void Run() {
            TVector<TSimpleSharedPtr<NThreading::TLegacyFuture<>>> futures;

            for (size_t i = 0; i < EnqueueThreads; ++i) {
                futures.push_back(new NThreading::TLegacyFuture<>(std::bind(&TDequeueAllTester<SingleConsumer>::Enqueuer, this)));
            }

            for (size_t i = 0; i < DequeueThreads; ++i) {
                futures.push_back(new NThreading::TLegacyFuture<>(std::bind(&TDequeueAllTester<SingleConsumer>::DequeuerAll, this)));
            }

            // effectively join
            futures.clear();

            UNIT_ASSERT_VALUES_EQUAL(0, int(LeftToDequeue.load()));

            TVector<int> left;
            Stack.DequeueAll(&left);
            UNIT_ASSERT(left.empty());
        }
    };

    Y_UNIT_TEST(TestDequeueAll) {
        TDequeueAllTester<false>().Run();
    }

    Y_UNIT_TEST(TestDequeueAllSingleConsumer) {
        TDequeueAllTester<true>().Run();
    }

    Y_UNIT_TEST(TestDequeueAllEmptyStack) {
        TLockFreeStack<int> stack;

        TVector<int> r;
        stack.DequeueAll(&r);

        UNIT_ASSERT(r.empty());
    }

    Y_UNIT_TEST(TestDequeueAllReturnsInReverseOrder) {
        TLockFreeStack<int> stack;

        stack.Enqueue(17);
        stack.Enqueue(19);
        stack.Enqueue(23);

        TVector<int> r;

        stack.DequeueAll(&r);

        UNIT_ASSERT_VALUES_EQUAL(size_t(3), r.size());
        UNIT_ASSERT_VALUES_EQUAL(23, r.at(0));
        UNIT_ASSERT_VALUES_EQUAL(19, r.at(1));
        UNIT_ASSERT_VALUES_EQUAL(17, r.at(2));
    }

    Y_UNIT_TEST(TestEnqueueAll) {
        TLockFreeStack<int> stack;

        TVector<int> v;
        TVector<int> expected;

        stack.EnqueueAll(v); // add empty

        v.push_back(2);
        v.push_back(3);
        v.push_back(5);
        expected.insert(expected.end(), v.begin(), v.end());
        stack.EnqueueAll(v);

        v.clear();

        stack.EnqueueAll(v); // add empty

        v.push_back(7);
        v.push_back(11);
        v.push_back(13);
        v.push_back(17);
        expected.insert(expected.end(), v.begin(), v.end());
        stack.EnqueueAll(v);

        TVector<int> actual;
        stack.DequeueAll(&actual);

        UNIT_ASSERT_VALUES_EQUAL(expected.size(), actual.size());
        for (size_t i = 0; i < actual.size(); ++i) {
            UNIT_ASSERT_VALUES_EQUAL(expected.at(expected.size() - i - 1), actual.at(i));
        }
    }

    Y_UNIT_TEST(CleanInDestructor) {
        TSimpleSharedPtr<bool> p(new bool);
        UNIT_ASSERT_VALUES_EQUAL(1u, p.RefCount());

        {
            TLockFreeStack<TSimpleSharedPtr<bool>> stack;

            stack.Enqueue(p);
            stack.Enqueue(p);

            UNIT_ASSERT_VALUES_EQUAL(3u, p.RefCount());
        }

        UNIT_ASSERT_VALUES_EQUAL(1, p.RefCount());
    }

    Y_UNIT_TEST(NoCopyTest) {
        static unsigned copied = 0;
        struct TCopyCount {
            TCopyCount(int) {
            }
            TCopyCount(const TCopyCount&) {
                ++copied;
            }

            TCopyCount(TCopyCount&&) {
            }

            TCopyCount& operator=(const TCopyCount&) {
                ++copied;
                return *this;
            }

            TCopyCount& operator=(TCopyCount&&) {
                return *this;
            }
        };

        TLockFreeStack<TCopyCount> stack;
        stack.Enqueue(TCopyCount(1));
        TCopyCount val(0);
        stack.Dequeue(&val);
        UNIT_ASSERT_VALUES_EQUAL(0, copied);
    }

    Y_UNIT_TEST(MoveOnlyTest) {
        TLockFreeStack<THolder<bool>> stack;
        stack.Enqueue(MakeHolder<bool>(true));
        THolder<bool> val;
        stack.Dequeue(&val);
        UNIT_ASSERT(val);
        UNIT_ASSERT_VALUES_EQUAL(true, *val);
    }

    template <class TTest>
    struct TMultiThreadTester {
        using ThisType = TMultiThreadTester<TTest>;

        size_t Threads;
        size_t OperationsPerThread;

        TCountDownLatch StartLatch;
        TLockFreeStack<typename TTest::ValueType> Stack;

        TMultiThreadTester()
            : Threads(10)
            , OperationsPerThread(100000)
            , StartLatch(Threads)
        {
        }

        void Worker() {
            StartLatch.CountDown();
            StartLatch.Await();

            TVector<typename TTest::ValueType> unused;
            for (size_t i = 0; i < OperationsPerThread; ++i) {
                switch (GetCycleCount() % 4) {
                    case 0: {
                        TTest::Enqueue(Stack, i);
                        break;
                    }
                    case 1: {
                        TTest::Dequeue(Stack);
                        break;
                    }
                    case 2: {
                        TTest::EnqueueAll(Stack);
                        break;
                    }
                    case 3: {
                        TTest::DequeueAll(Stack);
                        break;
                    }
                }
            }
        }

        void Run() {
            TDeque<NThreading::TLegacyFuture<>> futures;

            for (size_t i = 0; i < Threads; ++i) {
                futures.emplace_back(std::bind(&ThisType::Worker, this));
            }
            futures.clear();
            TTest::DequeueAll(Stack);
        }
    };

    struct TFreeListTest {
        using ValueType = int;

        static void Enqueue(TLockFreeStack<int>& stack, size_t i) {
            stack.Enqueue(static_cast<int>(i));
        }

        static void Dequeue(TLockFreeStack<int>& stack) {
            int value;
            stack.Dequeue(&value);
        }

        static void EnqueueAll(TLockFreeStack<int>& stack) {
            TVector<int> values(5);
            stack.EnqueueAll(values);
        }

        static void DequeueAll(TLockFreeStack<int>& stack) {
            TVector<int> value;
            stack.DequeueAll(&value);
        }
    };

    // Test for catching thread sanitizer problems
    Y_UNIT_TEST(TestFreeList) {
        TMultiThreadTester<TFreeListTest>().Run();
    }

    struct TMoveTest {
        using ValueType = THolder<int>;

        static void Enqueue(TLockFreeStack<ValueType>& stack, size_t i) {
            stack.Enqueue(MakeHolder<int>(static_cast<int>(i)));
        }

        static void Dequeue(TLockFreeStack<ValueType>& stack) {
            ValueType value;
            if (stack.Dequeue(&value)) {
                UNIT_ASSERT(value);
            }
        }

        static void EnqueueAll(TLockFreeStack<ValueType>& stack) {
            // there is no enqueAll with moving signature in LockFreeStack
            Enqueue(stack, 0);
        }

        static void DequeueAll(TLockFreeStack<ValueType>& stack) {
            TVector<ValueType> values;
            stack.DequeueAll(&values);
            for (auto& v : values) {
                UNIT_ASSERT(v);
            }
        }
    };

    // Test for catching thread sanitizer problems
    Y_UNIT_TEST(TesMultiThreadMove) {
        TMultiThreadTester<TMoveTest>().Run();
    }
}