aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/messagebus/misc/tokenquota.h
blob: 954cf0f0d721e0dc129a21023d08a4ab25b51ecf (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
#pragma once 
 
#include <util/system/atomic.h> 
 
namespace NBus { 
    /* Consumer and feeder quota model impl. 
 
        Consumer thread only calls: 
            Acquire(),  fetches tokens for usage from bucket; 
            Consume(),  eats given amount of tokens, must not 
                        be greater than Value() items; 
 
        Other threads (feeders) calls: 
            Return(),   put used tokens back to bucket; 
     */ 
 
    class TTokenQuota { 
    public: 
        TTokenQuota(bool enabled, size_t tokens, size_t wake) 
            : Enabled(tokens > 0 ? enabled : false) 
            , Acquired(0) 
            , WakeLev(wake < 1 ? Max<size_t>(1, tokens / 2) : 0) 
            , Tokens_(tokens) 
        {
            Y_UNUSED(padd_);
        }
 
        bool Acquire(TAtomic level = 1, bool force = false) {
            level = Max(TAtomicBase(level), TAtomicBase(1));
 
            if (Enabled && (Acquired < level || force)) {
                Acquired += AtomicSwap(&Tokens_, 0);
            } 
 
            return !Enabled || Acquired >= level; 
        } 
 
        void Consume(size_t items) {
            if (Enabled) {
                Y_ASSERT(Acquired >= TAtomicBase(items));
 
                Acquired -= items; 
            } 
        } 
 
        bool Return(size_t items_) noexcept {
            if (!Enabled || items_ == 0)
                return false; 
 
            const TAtomic items = items_; 
            const TAtomic value = AtomicAdd(Tokens_, items); 
 
            return (value - items < WakeLev && value >= WakeLev); 
        } 
 
        bool IsEnabled() const noexcept {
            return Enabled; 
        } 
 
        bool IsAboveWake() const noexcept {
            return !Enabled || (WakeLev <= AtomicGet(Tokens_)); 
        } 
 
        size_t Tokens() const noexcept {
            return Acquired + AtomicGet(Tokens_); 
        } 
 
        size_t Check(const TAtomic level) const noexcept {
            return !Enabled || level <= Acquired; 
        } 
 
    private: 
        bool Enabled;
        TAtomicBase Acquired;
        const TAtomicBase WakeLev; 
        TAtomic Tokens_;
 
        /* This padd requires for align Tokens_ member on its own 
            CPU cacheline. */ 
 
        ui64 padd_;
    }; 
}