aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/messagebus/actor/ring_buffer_with_spin_lock.h
blob: 3d11775ec31c760921f81cfca65e29082b268be2 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
#pragma once

#include "ring_buffer.h"

#include <library/cpp/deprecated/atomic/atomic.h>

#include <util/system/spinlock.h>

template <typename T>
class TRingBufferWithSpinLock {
private:
    TRingBuffer<T> RingBuffer;
    TSpinLock SpinLock;
    TAtomic CachedSize;

public:
    TRingBufferWithSpinLock()
        : CachedSize(0)
    {
    }

    void Push(const T& t) {
        PushAll(t);
    }

    void PushAll(TArrayRef<const T> collection) {
        if (collection.empty()) {
            return;
        }

        TGuard<TSpinLock> Guard(SpinLock);
        RingBuffer.PushAll(collection);
        AtomicSet(CachedSize, RingBuffer.Size());
    }

    bool TryPop(T* r, size_t* sizePtr = nullptr) {
        if (AtomicGet(CachedSize) == 0) {
            return false;
        }

        bool ok;
        size_t size;
        {
            TGuard<TSpinLock> Guard(SpinLock);
            ok = RingBuffer.TryPop(r);
            size = RingBuffer.Size();
            AtomicSet(CachedSize, size);
        }
        if (!!sizePtr) {
            *sizePtr = size;
        }
        return ok;
    }

    TMaybe<T> TryPop() {
        T tmp;
        if (TryPop(&tmp)) {
            return tmp;
        } else {
            return TMaybe<T>();
        }
    }

    bool PushAllAndTryPop(TArrayRef<const T> collection, T* r) {
        if (collection.size() == 0) {
            return TryPop(r);
        } else {
            if (AtomicGet(CachedSize) == 0) {
                *r = collection[0];
                if (collection.size() > 1) {
                    TGuard<TSpinLock> guard(SpinLock);
                    RingBuffer.PushAll(MakeArrayRef(collection.data() + 1, collection.size() - 1));
                    AtomicSet(CachedSize, RingBuffer.Size());
                }
            } else {
                TGuard<TSpinLock> guard(SpinLock);
                RingBuffer.PushAll(collection);
                *r = RingBuffer.Pop();
                AtomicSet(CachedSize, RingBuffer.Size());
            }
            return true;
        }
    }

    bool Empty() const {
        return AtomicGet(CachedSize) == 0;
    }

    size_t Size() const {
        TGuard<TSpinLock> Guard(SpinLock);
        return RingBuffer.Size();
    }
};