aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/threading/queue/mpmc_unordered_ring.cpp
blob: 160547f5946d0fc361a667e06cb0370fdc1139bb (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
#include "mpmc_unordered_ring.h"

namespace NThreading {
    TMPMCUnorderedRing::TMPMCUnorderedRing(size_t size) {
        Y_VERIFY(size > 0);
        RingSize = size;
        RingBuffer.Reset(new void*[size]);
        memset(&RingBuffer[0], 0, sizeof(void*) * size);
    }

    bool TMPMCUnorderedRing::Push(void* msg, ui16 retryCount) noexcept {
        if (retryCount == 0) {
            StubbornPush(msg);
            return true;
        }
        for (ui16 itry = retryCount; itry-- > 0;) {
            if (WeakPush(msg)) {
                return true;
            }
        }
        return false;
    }

    bool TMPMCUnorderedRing::WeakPush(void* msg) noexcept {
        auto pawl = AtomicIncrement(WritePawl);
        if (pawl - AtomicGet(ReadFront) >= RingSize) {
            // Queue is full
            AtomicDecrement(WritePawl);
            return false;
        }

        auto writeSlot = AtomicGetAndIncrement(WriteFront);
        if (AtomicCas(&RingBuffer[writeSlot % RingSize], msg, nullptr)) {
            return true;
        }
        // slot is occupied for some reason, retry
        return false;
    }

    void* TMPMCUnorderedRing::Pop() noexcept {
        ui64 readSlot;

        for (ui16 itry = MAX_POP_TRIES; itry-- > 0;) {
            auto pawl = AtomicIncrement(ReadPawl);
            if (pawl > AtomicGet(WriteFront)) {
                // Queue is empty
                AtomicDecrement(ReadPawl);
                return nullptr;
            }

            readSlot = AtomicGetAndIncrement(ReadFront);

            auto msg = AtomicSwap(&RingBuffer[readSlot % RingSize], nullptr);
            if (msg != nullptr) {
                return msg;
            }
        }

        /* got no message in the slot, let's try to rollback readfront */
        AtomicCas(&ReadFront, readSlot - 1, readSlot);
        return nullptr;
    }

    void* TMPMCUnorderedRing::UnsafeScanningPop(ui64* last) noexcept {
        for (; *last < RingSize;) {
            auto msg = AtomicSwap(&RingBuffer[*last], nullptr);
            ++*last;
            if (msg != nullptr) {
                return msg;
            }
        }
        return nullptr;
    }
}