aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/actors/util/queue_oneone_inplace.h
blob: 288011955a8b3bf065072ff61c7afbcd5c7f9201 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
#pragma once

#include "defs.h"
#include "queue_chunk.h"

template <typename T, ui32 TSize, typename TChunk = TQueueChunk<T, TSize>>
class TOneOneQueueInplace : TNonCopyable {
    static_assert(std::is_integral<T>::value || std::is_pointer<T>::value, "expect std::is_integral<T>::value || std::is_pointer<T>::valuer");

    TChunk* ReadFrom;
    ui32 ReadPosition;
    ui32 WritePosition;
    TChunk* WriteTo;

    friend class TReadIterator;

public:
    class TReadIterator {
        TChunk* ReadFrom;
        ui32 ReadPosition;

    public:
        TReadIterator(TChunk* readFrom, ui32 readPosition)
            : ReadFrom(readFrom)
            , ReadPosition(readPosition)
        {
        }

        inline T Next() {
            TChunk* head = ReadFrom;
            if (ReadPosition != TChunk::EntriesCount) {
                return AtomicLoad(&head->Entries[ReadPosition++]);
            } else if (TChunk* next = AtomicLoad(&head->Next)) {
                ReadFrom = next;
                ReadPosition = 0;
                return Next();
            }
            return T{};
        }
    };

    TOneOneQueueInplace()
        : ReadFrom(new TChunk())
        , ReadPosition(0)
        , WritePosition(0)
        , WriteTo(ReadFrom)
    {
    }

    ~TOneOneQueueInplace() {
        Y_DEBUG_ABORT_UNLESS(Head() == 0);
        delete ReadFrom;
    }

    struct TPtrCleanDestructor {
        static inline void Destroy(TOneOneQueueInplace<T, TSize>* x) noexcept {
            while (T head = x->Pop())
                delete head;
            delete x;
        }
    };

    struct TCleanDestructor {
        static inline void Destroy(TOneOneQueueInplace<T, TSize>* x) noexcept {
            while (x->Pop() != nullptr)
                continue;
            delete x;
        }
    };

    struct TPtrCleanInplaceMallocDestructor {
        template <typename TPtrVal>
        static inline void Destroy(TOneOneQueueInplace<TPtrVal*, TSize>* x) noexcept {
            while (TPtrVal* head = x->Pop()) {
                head->~TPtrVal();
                free(head);
            }
            delete x;
        }
    };

    void Push(T x) noexcept {
        if (WritePosition != TChunk::EntriesCount) {
            AtomicStore(&WriteTo->Entries[WritePosition], x);
            ++WritePosition;
        } else {
            TChunk* next = new TChunk();
            next->Entries[0] = x;
            AtomicStore(&WriteTo->Next, next);
            WriteTo = next;
            WritePosition = 1;
        }
    }

    T Head() {
        TChunk* head = ReadFrom;
        if (ReadPosition != TChunk::EntriesCount) {
            return AtomicLoad(&head->Entries[ReadPosition]);
        } else if (TChunk* next = AtomicLoad(&head->Next)) {
            ReadFrom = next;
            delete head;
            ReadPosition = 0;
            return Head();
        }
        return T{};
    }

    T Pop() {
        T ret = Head();
        if (ret)
            ++ReadPosition;
        return ret;
    }

    TReadIterator Iterator() {
        return TReadIterator(ReadFrom, ReadPosition);
    }
};