summaryrefslogtreecommitdiffstats
path: root/library/cpp/threading/queue/mpmc_unordered_ring.cpp
blob: df48182210f26fa50042b7d1af77c6fa815a5de2 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
#include "mpmc_unordered_ring.h" 
 
namespace NThreading { 
    TMPMCUnorderedRing::TMPMCUnorderedRing(size_t size) { 
        Y_VERIFY(size > 0); 
        RingSize = size; 
        RingBuffer.Reset(new void*[size]); 
        memset(&RingBuffer[0], 0, sizeof(void*) * size); 
    } 
 
    bool TMPMCUnorderedRing::Push(void* msg, ui16 retryCount) noexcept { 
        if (retryCount == 0) { 
            StubbornPush(msg); 
            return true; 
        } 
        for (ui16 itry = retryCount; itry-- > 0;) { 
            if (WeakPush(msg)) { 
                return true; 
            } 
        } 
        return false; 
    } 
 
    bool TMPMCUnorderedRing::WeakPush(void* msg) noexcept { 
        auto pawl = AtomicIncrement(WritePawl); 
        if (pawl - AtomicGet(ReadFront) >= RingSize) { 
            // Queue is full 
            AtomicDecrement(WritePawl); 
            return false; 
        } 
 
        auto writeSlot = AtomicGetAndIncrement(WriteFront); 
        if (AtomicCas(&RingBuffer[writeSlot % RingSize], msg, nullptr)) { 
            return true; 
        } 
        // slot is occupied for some reason, retry 
        return false; 
    } 
 
    void* TMPMCUnorderedRing::Pop() noexcept { 
        ui64 readSlot; 
 
        for (ui16 itry = MAX_POP_TRIES; itry-- > 0;) { 
            auto pawl = AtomicIncrement(ReadPawl); 
            if (pawl > AtomicGet(WriteFront)) { 
                // Queue is empty 
                AtomicDecrement(ReadPawl); 
                return nullptr; 
            } 
 
            readSlot = AtomicGetAndIncrement(ReadFront); 
 
            auto msg = AtomicSwap(&RingBuffer[readSlot % RingSize], nullptr); 
            if (msg != nullptr) { 
                return msg; 
            } 
        } 
 
        /* got no message in the slot, let's try to rollback readfront */ 
        AtomicCas(&ReadFront, readSlot - 1, readSlot); 
        return nullptr; 
    } 
 
    void* TMPMCUnorderedRing::UnsafeScanningPop(ui64* last) noexcept { 
        for (; *last < RingSize;) { 
            auto msg = AtomicSwap(&RingBuffer[*last], nullptr); 
            ++*last; 
            if (msg != nullptr) { 
                return msg; 
            } 
        } 
        return nullptr; 
    } 
}