blob: 160547f5946d0fc361a667e06cb0370fdc1139bb (
plain) (
blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
|
#include "mpmc_unordered_ring.h"
namespace NThreading {
TMPMCUnorderedRing::TMPMCUnorderedRing(size_t size) {
Y_VERIFY(size > 0);
RingSize = size;
RingBuffer.Reset(new void*[size]);
memset(&RingBuffer[0], 0, sizeof(void*) * size);
}
bool TMPMCUnorderedRing::Push(void* msg, ui16 retryCount) noexcept {
if (retryCount == 0) {
StubbornPush(msg);
return true;
}
for (ui16 itry = retryCount; itry-- > 0;) {
if (WeakPush(msg)) {
return true;
}
}
return false;
}
bool TMPMCUnorderedRing::WeakPush(void* msg) noexcept {
auto pawl = AtomicIncrement(WritePawl);
if (pawl - AtomicGet(ReadFront) >= RingSize) {
// Queue is full
AtomicDecrement(WritePawl);
return false;
}
auto writeSlot = AtomicGetAndIncrement(WriteFront);
if (AtomicCas(&RingBuffer[writeSlot % RingSize], msg, nullptr)) {
return true;
}
// slot is occupied for some reason, retry
return false;
}
void* TMPMCUnorderedRing::Pop() noexcept {
ui64 readSlot;
for (ui16 itry = MAX_POP_TRIES; itry-- > 0;) {
auto pawl = AtomicIncrement(ReadPawl);
if (pawl > AtomicGet(WriteFront)) {
// Queue is empty
AtomicDecrement(ReadPawl);
return nullptr;
}
readSlot = AtomicGetAndIncrement(ReadFront);
auto msg = AtomicSwap(&RingBuffer[readSlot % RingSize], nullptr);
if (msg != nullptr) {
return msg;
}
}
/* got no message in the slot, let's try to rollback readfront */
AtomicCas(&ReadFront, readSlot - 1, readSlot);
return nullptr;
}
void* TMPMCUnorderedRing::UnsafeScanningPop(ui64* last) noexcept {
for (; *last < RingSize;) {
auto msg = AtomicSwap(&RingBuffer[*last], nullptr);
++*last;
if (msg != nullptr) {
return msg;
}
}
return nullptr;
}
}
|