blob: d4e359b31c7c626e6ee2e96da722ece688ea2994 (
plain) (
blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
|
#ifndef SPSC_QUEUE_INL_H_
#error "Direct inclusion of this file is not allowed, include spsc_queue.h"
// For the sake of sane code completion.
#include "spsc_queue.h"
#endif
namespace NYT {
////////////////////////////////////////////////////////////////////////////////
template <class T>
struct TSpscQueue<T>::TNode
{
std::atomic<TNode*> Next = nullptr;
size_t Offset = 0;
T Data[BufferSize];
};
template <class T>
TSpscQueue<T>::TSpscQueue()
{
auto* initialNode = new TNode();
Head_ = initialNode;
Tail_ = initialNode;
}
template <class T>
TSpscQueue<T>::~TSpscQueue()
{
auto current = Head_;
while (current) {
auto next = current->Next.load(std::memory_order::acquire);
delete current;
current = next;
}
}
template <class T>
void TSpscQueue<T>::Push(T&& element)
{
auto count = Count_.load(std::memory_order::acquire);
size_t position = count - Tail_->Offset;
if (Y_UNLIKELY(position == BufferSize)) {
auto oldTail = Tail_;
Tail_ = new TNode();
Tail_->Offset = count;
oldTail->Next.store(Tail_);
position = 0;
}
Tail_->Data[position] = std::move(element);
Count_.store(count + 1, std::memory_order::release);
}
template <class T>
T* TSpscQueue<T>::Front() const
{
if (Y_UNLIKELY(Offset_ >= CachedCount_)) {
auto count = Count_.load(std::memory_order::acquire);
CachedCount_ = count;
if (Offset_ >= count) {
return nullptr;
}
}
while (Y_UNLIKELY(Offset_ >= Head_->Offset + BufferSize)) {
auto next = Head_->Next.load(std::memory_order::acquire);
YT_VERIFY(next);
delete Head_;
Head_ = next;
}
auto position = Offset_ - Head_->Offset;
return &Head_->Data[position];
}
template <class T>
void TSpscQueue<T>::Pop()
{
++Offset_;
}
template <class T>
bool TSpscQueue<T>::IsEmpty() const
{
auto count = Count_.load();
YT_ASSERT(Offset_ <= count);
return Offset_ == count;
}
////////////////////////////////////////////////////////////////////////////////
} // namespace NYT
|