aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/threading/bounded_queue/bounded_queue_ut.cpp
blob: bb5b6eb787bcb00626eec5d6d0ed92c94fb33640 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
#include "bounded_queue.h"

#include <library/cpp/testing/unittest/registar.h>
#include <util/thread/factory.h>

using namespace NThreading;

Y_UNIT_TEST_SUITE(TBoundedQueueTest) {
    Y_UNIT_TEST(QueueSize) {
        const size_t queueSize = 16;
        TBoundedQueue<size_t> boundedQueue(queueSize);

        for (size_t i = 0; i < queueSize; ++i) {
            UNIT_ASSERT(boundedQueue.Enqueue(i));
        }
        UNIT_ASSERT(!boundedQueue.Enqueue(0));
        size_t tmp = 0;
        UNIT_ASSERT(boundedQueue.Dequeue(tmp));
        UNIT_ASSERT(boundedQueue.Enqueue(0));
        UNIT_ASSERT(!boundedQueue.Enqueue(0));
    }

    Y_UNIT_TEST(Move) {
        const size_t queueSize = 16;
        TBoundedQueue<TString> boundedQueue(queueSize);

        for (size_t i = 0; i < queueSize; ++i) {
            TString v = "xxx";
            UNIT_ASSERT(boundedQueue.Enqueue(std::move(v)));
            UNIT_ASSERT(v.empty());
        }

        {
            TString v = "xxx";
            UNIT_ASSERT(!boundedQueue.Enqueue(std::move(v)));
            UNIT_ASSERT(v == "xxx");
        }

        TString v;
        UNIT_ASSERT(boundedQueue.Dequeue(v));
        UNIT_ASSERT(v == "xxx");
    }

    Y_UNIT_TEST(MPMC) {
        size_t queueSize = 16;
        size_t producers = 10;
        size_t consumers = 10;
        size_t itemsCount = 10000;

        TVector<THolder<IThreadFactory::IThread>> threads;
        TBoundedQueue<std::pair<size_t, size_t>> boundedQueue(queueSize);

        std::atomic<size_t> itemCounter = 0;
        std::atomic<size_t> consumed = 0;

        for (size_t i = 0; i < consumers; ++i) {
            threads.push_back(SystemThreadFactory()->Run(
                [&]() {
                    TVector<size_t> prevItems(producers);
                    for (;;) {
                        std::pair<size_t, size_t> item;
                        while (!boundedQueue.Dequeue(item)) {
                            ;
                        }

                        if (item.first >= producers) {
                            break;
                        }

                        UNIT_ASSERT(item.second > prevItems[item.first]);
                        prevItems[item.first] = item.second;
                        ++consumed;
                    }
                })
            );
        }

        for (size_t i = 0; i < producers ; ++i) {
            threads.push_back(SystemThreadFactory()->Run(
                [&, producerNum = i]() {
                    for (;;) {
                        size_t item = ++itemCounter;
                        if (item > itemsCount) {
                            break;
                        }

                        while (!boundedQueue.Enqueue(std::make_pair(producerNum, item))) {
                            ;
                        }
                    }

                    while (!boundedQueue.Enqueue(std::make_pair(producers, size_t(0)))) {
                        ;
                    }
                })
            );
        }


        for (auto& t : threads) {
            t->Join();
        }

        UNIT_ASSERT_VALUES_EQUAL(consumed.load(), itemsCount);
    }
}