aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/messagebus/misc/tokenquota.h
blob: d5bdd796257af512b695b28a64c6d5286cfc9c8e (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
#pragma once

#include <util/system/atomic.h>

namespace NBus {
    /* Consumer and feeder quota model impl.

        Consumer thread only calls:
            Acquire(),  fetches tokens for usage from bucket;
            Consume(),  eats given amount of tokens, must not
                        be greater than Value() items;

        Other threads (feeders) calls:
            Return(),   put used tokens back to bucket;
     */

    class TTokenQuota {
    public:
        TTokenQuota(bool enabled, size_t tokens, size_t wake)
            : Enabled(tokens > 0 ? enabled : false)
            , Acquired(0)
            , WakeLev(wake < 1 ? Max<size_t>(1, tokens / 2) : 0)
            , Tokens_(tokens)
        {
            Y_UNUSED(padd_);
        }

        bool Acquire(TAtomic level = 1, bool force = false) { 
            level = Max(TAtomicBase(level), TAtomicBase(1)); 

            if (Enabled && (Acquired < level || force)) { 
                Acquired += AtomicSwap(&Tokens_, 0);
            }

            return !Enabled || Acquired >= level;
        }

        void Consume(size_t items) { 
            if (Enabled) { 
                Y_ASSERT(Acquired >= TAtomicBase(items));

                Acquired -= items;
            }
        }

        bool Return(size_t items_) noexcept { 
            if (!Enabled || items_ == 0) 
                return false;

            const TAtomic items = items_;
            const TAtomic value = AtomicAdd(Tokens_, items);

            return (value - items < WakeLev && value >= WakeLev);
        }

        bool IsEnabled() const noexcept { 
            return Enabled;
        }

        bool IsAboveWake() const noexcept { 
            return !Enabled || (WakeLev <= AtomicGet(Tokens_));
        }

        size_t Tokens() const noexcept { 
            return Acquired + AtomicGet(Tokens_);
        }

        size_t Check(const TAtomic level) const noexcept { 
            return !Enabled || level <= Acquired;
        }

    private:
        bool Enabled; 
        TAtomicBase Acquired; 
        const TAtomicBase WakeLev;
        TAtomic Tokens_; 

        /* This padd requires for align Tokens_ member on its own
            CPU cacheline. */

        ui64 padd_; 
    };
}