1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
|
#pragma once
#include "defs.h"
#include "queue_chunk.h"
template <typename T, ui32 TSize, typename TChunk = TQueueChunk<T, TSize>>
class TOneOneQueueInplace : TNonCopyable {
static_assert(std::is_integral<T>::value || std::is_pointer<T>::value, "expect std::is_integral<T>::value || std::is_pointer<T>::valuer");
TChunk* ReadFrom;
ui32 ReadPosition;
ui32 WritePosition;
TChunk* WriteTo;
friend class TReadIterator;
public:
class TReadIterator {
TChunk* ReadFrom;
ui32 ReadPosition;
public:
TReadIterator(TChunk* readFrom, ui32 readPosition)
: ReadFrom(readFrom)
, ReadPosition(readPosition)
{
}
inline T Next() {
TChunk* head = ReadFrom;
if (ReadPosition != TChunk::EntriesCount) {
return AtomicLoad(&head->Entries[ReadPosition++]);
} else if (TChunk* next = AtomicLoad(&head->Next)) {
ReadFrom = next;
ReadPosition = 0;
return Next();
}
return T{};
}
};
TOneOneQueueInplace()
: ReadFrom(new TChunk())
, ReadPosition(0)
, WritePosition(0)
, WriteTo(ReadFrom)
{
}
~TOneOneQueueInplace() {
Y_VERIFY_DEBUG(Head() == 0);
delete ReadFrom;
}
struct TPtrCleanDestructor {
static inline void Destroy(TOneOneQueueInplace<T, TSize>* x) noexcept {
while (T head = x->Pop())
delete head;
delete x;
}
};
struct TCleanDestructor {
static inline void Destroy(TOneOneQueueInplace<T, TSize>* x) noexcept {
while (x->Pop() != nullptr)
continue;
delete x;
}
};
struct TPtrCleanInplaceMallocDestructor {
template <typename TPtrVal>
static inline void Destroy(TOneOneQueueInplace<TPtrVal*, TSize>* x) noexcept {
while (TPtrVal* head = x->Pop()) {
head->~TPtrVal();
free(head);
}
delete x;
}
};
void Push(T x) noexcept {
if (WritePosition != TChunk::EntriesCount) {
AtomicStore(&WriteTo->Entries[WritePosition], x);
++WritePosition;
} else {
TChunk* next = new TChunk();
next->Entries[0] = x;
AtomicStore(&WriteTo->Next, next);
WriteTo = next;
WritePosition = 1;
}
}
T Head() {
TChunk* head = ReadFrom;
if (ReadPosition != TChunk::EntriesCount) {
return AtomicLoad(&head->Entries[ReadPosition]);
} else if (TChunk* next = AtomicLoad(&head->Next)) {
ReadFrom = next;
delete head;
ReadPosition = 0;
return Head();
}
return T{};
}
T Pop() {
T ret = Head();
if (ret)
++ReadPosition;
return ret;
}
TReadIterator Iterator() {
return TReadIterator(ReadFrom, ReadPosition);
}
};
|