aboutsummaryrefslogblamecommitdiffstats
path: root/util/thread/lfqueue_ut.cpp
blob: 626f7d4091a3aad064a1e3585f8056bca929f5cf (plain) (tree)
1
2
3
4
5
6
7
8
9
                                                  
 
                                   
                                
                             
 
                    
 








































                                                  

                          
                       

                                              
                    

                                                   
                    

                                                         
                      


                                                              
                      




























                                                                                                    
                                        















                                                       
                                           
                                  
                   
                                        
                                        
















                                        



                                         
     
                                              
                                  
                   
                                        
                                        
 
                       


























                                           
                                                                            


















                                                                          
                                                                          









                                                                                                     
                                                                   


                                                       
                                                   






                                                                                          
                                                               





























































                                                                          
                                    
                                           

                                                   
                                                         







                                                       
 
                                       









                                                 
 
#include <library/cpp/threading/future/future.h>
#include <library/cpp/testing/unittest/registar.h>

#include <util/generic/algorithm.h>
#include <util/generic/vector.h>
#include <util/generic/ptr.h>
#include <util/thread/pool.h>

#include "lfqueue.h"

class TMoveTest {
public:
    TMoveTest(int marker = 0, int value = 0)
        : Marker_(marker)
        , Value_(value)
    {
    }

    TMoveTest(const TMoveTest& other) {
        *this = other;
    }

    TMoveTest(TMoveTest&& other) {
        *this = std::move(other);
    }

    TMoveTest& operator=(const TMoveTest& other) {
        Value_ = other.Value_;
        Marker_ = other.Marker_ + 1024;
        return *this;
    }

    TMoveTest& operator=(TMoveTest&& other) {
        Value_ = other.Value_;
        Marker_ = other.Marker_;
        other.Marker_ = 0;
        return *this;
    }

    int Marker() const {
        return Marker_;
    }

    int Value() const {
        return Value_;
    }

private:
    int Marker_ = 0;
    int Value_ = 0;
};

class TOperationsChecker {
public:
    TOperationsChecker() {
        ++DefaultCtor_;
    }

    TOperationsChecker(TOperationsChecker&&) {
        ++MoveCtor_;
    }

    TOperationsChecker(const TOperationsChecker&) {
        ++CopyCtor_;
    }

    TOperationsChecker& operator=(TOperationsChecker&&) {
        ++MoveAssign_;
        return *this;
    }

    TOperationsChecker& operator=(const TOperationsChecker&) {
        ++CopyAssign_;
        return *this;
    }

    static void Check(int defaultCtor, int moveCtor, int copyCtor, int moveAssign, int copyAssign) {
        UNIT_ASSERT_VALUES_EQUAL(defaultCtor, DefaultCtor_);
        UNIT_ASSERT_VALUES_EQUAL(moveCtor, MoveCtor_);
        UNIT_ASSERT_VALUES_EQUAL(copyCtor, CopyCtor_);
        UNIT_ASSERT_VALUES_EQUAL(moveAssign, MoveAssign_);
        UNIT_ASSERT_VALUES_EQUAL(copyAssign, CopyAssign_);
        Clear();
    }

private:
    static void Clear() {
        DefaultCtor_ = MoveCtor_ = CopyCtor_ = MoveAssign_ = CopyAssign_ = 0;
    }

    static int DefaultCtor_;
    static int MoveCtor_;
    static int CopyCtor_;
    static int MoveAssign_;
    static int CopyAssign_;
};

int TOperationsChecker::DefaultCtor_ = 0;
int TOperationsChecker::MoveCtor_ = 0;
int TOperationsChecker::CopyCtor_ = 0;
int TOperationsChecker::MoveAssign_ = 0;
int TOperationsChecker::CopyAssign_ = 0;

Y_UNIT_TEST_SUITE(TLockFreeQueueTests) {
    Y_UNIT_TEST(TestMoveEnqueue) {
        TMoveTest value(0xFF, 0xAA);
        TMoveTest tmp;

        TLockFreeQueue<TMoveTest> queue;

        queue.Enqueue(value);
        UNIT_ASSERT_VALUES_EQUAL(value.Marker(), 0xFF);
        UNIT_ASSERT(queue.Dequeue(&tmp));
        UNIT_ASSERT_VALUES_UNEQUAL(tmp.Marker(), 0xFF);
        UNIT_ASSERT_VALUES_EQUAL(tmp.Value(), 0xAA);

        queue.Enqueue(std::move(value));
        UNIT_ASSERT_VALUES_EQUAL(value.Marker(), 0);
        UNIT_ASSERT(queue.Dequeue(&tmp));
        UNIT_ASSERT_VALUES_EQUAL(tmp.Value(), 0xAA);
    }

    Y_UNIT_TEST(TestSimpleEnqueueDequeue) {
        TLockFreeQueue<int> queue;

        int i = -1;

        UNIT_ASSERT(!queue.Dequeue(&i));
        UNIT_ASSERT_VALUES_EQUAL(i, -1);

        queue.Enqueue(10);
        queue.Enqueue(11);
        queue.Enqueue(12);

        UNIT_ASSERT(queue.Dequeue(&i));
        UNIT_ASSERT_VALUES_EQUAL(10, i);
        UNIT_ASSERT(queue.Dequeue(&i));
        UNIT_ASSERT_VALUES_EQUAL(11, i);

        queue.Enqueue(13);

        UNIT_ASSERT(queue.Dequeue(&i));
        UNIT_ASSERT_VALUES_EQUAL(12, i);
        UNIT_ASSERT(queue.Dequeue(&i));
        UNIT_ASSERT_VALUES_EQUAL(13, i);

        UNIT_ASSERT(!queue.Dequeue(&i));

        const int tmp = 100;
        queue.Enqueue(tmp);
        UNIT_ASSERT(queue.Dequeue(&i));
        UNIT_ASSERT_VALUES_EQUAL(i, tmp);
    }

    Y_UNIT_TEST(TestSimpleEnqueueAllDequeue) {
        TLockFreeQueue<int> queue;

        int i = -1;

        UNIT_ASSERT(!queue.Dequeue(&i));
        UNIT_ASSERT_VALUES_EQUAL(i, -1);

        TVector<int> v;
        v.push_back(20);
        v.push_back(21);

        queue.EnqueueAll(v);

        v.clear();
        v.push_back(22);
        v.push_back(23);
        v.push_back(24);

        queue.EnqueueAll(v);

        v.clear();
        queue.EnqueueAll(v);

        v.clear();
        v.push_back(25);

        queue.EnqueueAll(v);

        for (int j = 20; j <= 25; ++j) {
            UNIT_ASSERT(queue.Dequeue(&i));
            UNIT_ASSERT_VALUES_EQUAL(j, i);
        }

        UNIT_ASSERT(!queue.Dequeue(&i));
    }

    void DequeueAllRunner(TLockFreeQueue<int>& queue, bool singleConsumer) {
        size_t threadsNum = 4;
        size_t enqueuesPerThread = 10'000;
        TThreadPool p;
        p.Start(threadsNum, 0);

        TVector<NThreading::TFuture<void>> futures;

        for (size_t i = 0; i < threadsNum; ++i) {
            NThreading::TPromise<void> promise = NThreading::NewPromise();
            futures.emplace_back(promise.GetFuture());

            p.SafeAddFunc([enqueuesPerThread, &queue, promise]() mutable {
                for (size_t i = 0; i != enqueuesPerThread; ++i) {
                    queue.Enqueue(i);
                }

                promise.SetValue();
            });
        }

        std::atomic<size_t> elementsLeft = threadsNum * enqueuesPerThread;

        ui64 numOfConsumers = singleConsumer ? 1 : threadsNum;

        TVector<TVector<int>> dataBuckets(numOfConsumers);

        for (size_t i = 0; i < numOfConsumers; ++i) {
            NThreading::TPromise<void> promise = NThreading::NewPromise();
            futures.emplace_back(promise.GetFuture());

            p.SafeAddFunc([&queue, &elementsLeft, promise, consumerData{&dataBuckets[i]}]() mutable {
                TVector<int> vec;
                while (static_cast<i64>(elementsLeft.load()) > 0) {
                    for (size_t i = 0; i != 100; ++i) {
                        vec.clear();
                        queue.DequeueAll(&vec);

                        elementsLeft -= vec.size();
                        consumerData->insert(consumerData->end(), vec.begin(), vec.end());
                    }
                }

                promise.SetValue();
            });
        }

        NThreading::WaitExceptionOrAll(futures).GetValueSync();
        p.Stop();

        TVector<int> left;
        queue.DequeueAll(&left);

        UNIT_ASSERT(left.empty());

        TVector<int> data;
        for (auto& dataBucket : dataBuckets) {
            data.insert(data.end(), dataBucket.begin(), dataBucket.end());
        }

        UNIT_ASSERT_EQUAL(data.size(), threadsNum * enqueuesPerThread);

        size_t threadIdx = 0;
        size_t cntValue = 0;

        Sort(data.begin(), data.end());
        for (size_t i = 0; i != data.size(); ++i) {
            UNIT_ASSERT_VALUES_EQUAL(cntValue, data[i]);
            ++threadIdx;

            if (threadIdx == threadsNum) {
                ++cntValue;
                threadIdx = 0;
            }
        }
    }

    Y_UNIT_TEST(TestDequeueAllSingleConsumer) {
        TLockFreeQueue<int> queue;
        DequeueAllRunner(queue, true);
    }

    Y_UNIT_TEST(TestDequeueAllMultipleConsumers) {
        TLockFreeQueue<int> queue;
        DequeueAllRunner(queue, false);
    }

    Y_UNIT_TEST(TestDequeueAllEmptyQueue) {
        TLockFreeQueue<int> queue;
        TVector<int> vec;

        queue.DequeueAll(&vec);

        UNIT_ASSERT(vec.empty());
    }

    Y_UNIT_TEST(TestDequeueAllQueueOrder) {
        TLockFreeQueue<int> queue;
        queue.Enqueue(1);
        queue.Enqueue(2);
        queue.Enqueue(3);

        TVector<int> v;
        queue.DequeueAll(&v);

        UNIT_ASSERT_VALUES_EQUAL(v.size(), 3);
        UNIT_ASSERT_VALUES_EQUAL(v[0], 1);
        UNIT_ASSERT_VALUES_EQUAL(v[1], 2);
        UNIT_ASSERT_VALUES_EQUAL(v[2], 3);
    }

    Y_UNIT_TEST(CleanInDestructor) {
        TSimpleSharedPtr<bool> p(new bool);
        UNIT_ASSERT_VALUES_EQUAL(1u, p.RefCount());

        {
            TLockFreeQueue<TSimpleSharedPtr<bool>> stack;

            stack.Enqueue(p);
            stack.Enqueue(p);

            UNIT_ASSERT_VALUES_EQUAL(3u, p.RefCount());
        }

        UNIT_ASSERT_VALUES_EQUAL(1, p.RefCount());
    }

    Y_UNIT_TEST(CheckOperationsCount) {
        TOperationsChecker o;
        o.Check(1, 0, 0, 0, 0);
        TLockFreeQueue<TOperationsChecker> queue;
        o.Check(0, 0, 0, 0, 0);
        queue.Enqueue(std::move(o));
        o.Check(0, 1, 0, 0, 0);
        queue.Enqueue(o);
        o.Check(0, 0, 1, 0, 0);
        queue.Dequeue(&o);
        o.Check(0, 0, 2, 1, 0);
    }
}