aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/sliding_window/sliding_window.h
blob: 180bdf93d0dd4c782cecb00d10733de6bede7704 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
#pragma once

#include <util/datetime/base.h>
#include <util/generic/vector.h>
#include <util/system/guard.h>
#include <util/system/mutex.h>
#include <util/system/types.h>
#include <util/system/yassert.h>

#include <functional>
#include <limits>

namespace NSlidingWindow {
    namespace NPrivate {
        template <class TValueType_, class TCmp, TValueType_ initialValue> // std::less for max, std::greater for min
        struct TMinMaxOperationImpl {
            using TValueType = TValueType_;
            using TValueVector = TVector<TValueType>;

            static constexpr TValueType InitialValue() {
                return initialValue;
            }

            // Updates value in current bucket and returns window value
            static TValueType UpdateBucket(TValueType windowValue, TValueVector& buckets, size_t index, TValueType newVal) {
                Y_ASSERT(index < buckets.size());
                TCmp cmp;
                TValueType& curVal = buckets[index];
                if (cmp(curVal, newVal)) {
                    curVal = newVal;
                    if (cmp(windowValue, newVal)) {
                        windowValue = newVal;
                    }
                }
                return windowValue;
            }

            static TValueType ClearBuckets(TValueType windowValue, TValueVector& buckets, const size_t firstElemIndex, const size_t bucketsToClear) {
                Y_ASSERT(!buckets.empty());
                Y_ASSERT(firstElemIndex < buckets.size());
                Y_ASSERT(bucketsToClear <= buckets.size());
                TCmp cmp;

                bool needRecalc = false;
                size_t current = firstElemIndex;
                const size_t arraySize = buckets.size();
                for (size_t i = 0; i < bucketsToClear; ++i) {
                    TValueType& curVal = buckets[current];
                    if (!needRecalc && windowValue == curVal) {
                        needRecalc = true;
                    }
                    curVal = InitialValue();
                    current = (current + 1) % arraySize;
                }
                if (needRecalc) {
                    windowValue = InitialValue();
                    for (size_t i = 0; i < firstElemIndex; ++i) {
                        const TValueType val = buckets[i];
                        if (cmp(windowValue, val)) {
                            windowValue = val;
                        }
                    }
                    for (size_t i = current, size = buckets.size(); i < size; ++i) {
                        const TValueType val = buckets[i];
                        if (cmp(windowValue, val)) {
                            windowValue = val;
                        }
                    }
                }
                return windowValue;
            }
        };

    }

    template <class TValueType>
    using TMaxOperation = NPrivate::TMinMaxOperationImpl<TValueType, std::less<TValueType>, std::numeric_limits<TValueType>::min()>;

    template <class TValueType>
    using TMinOperation = NPrivate::TMinMaxOperationImpl<TValueType, std::greater<TValueType>, std::numeric_limits<TValueType>::max()>;

    template <class TValueType_>
    struct TSumOperation {
        using TValueType = TValueType_;
        using TValueVector = TVector<TValueType>;

        static constexpr TValueType InitialValue() {
            return TValueType(); // zero
        }

        // Updates value in current bucket and returns window value
        static TValueType UpdateBucket(TValueType windowValue, TValueVector& buckets, size_t index, TValueType newVal) {
            Y_ASSERT(index < buckets.size());
            buckets[index] += newVal;
            windowValue += newVal;
            return windowValue;
        }

        static TValueType ClearBuckets(TValueType windowValue, TValueVector& buckets, size_t firstElemIndex, size_t bucketsToClear) {
            Y_ASSERT(!buckets.empty());
            Y_ASSERT(firstElemIndex < buckets.size());
            Y_ASSERT(bucketsToClear <= buckets.size());

            const size_t arraySize = buckets.size();
            for (size_t i = 0; i < bucketsToClear; ++i) {
                TValueType& curVal = buckets[firstElemIndex];
                windowValue -= curVal;
                curVal = InitialValue();
                firstElemIndex = (firstElemIndex + 1) % arraySize;
            }
            return windowValue;
        }
    };

    /////////////////////////////////////////////////////////////////////////////////////////
    // TSlidingWindow
    /////////////////////////////////////////////////////////////////////////////////////////
    template <class TOperation, class TMutexImpl = TFakeMutex>
    class TSlidingWindow {
    public:
        using TValueType = typename TOperation::TValueType;
        using TValueVector = TVector<TValueType>;
        using TSizeType = typename TValueVector::size_type;

    public:
        TSlidingWindow(const TDuration& length, TSizeType partsNum)
            : Mutex()
            , Buckets(partsNum, TOperation::InitialValue()) // vector of size partsNum initialized with initial value
            , WindowValue(TOperation::InitialValue())
            , FirstElem(0)
            , PeriodStart()
            , Length(length)
            , MicroSecondsPerBucket(length.MicroSeconds() / partsNum)
        {
        }

        TSlidingWindow(const TSlidingWindow& w)
            : Mutex()
        {
            TGuard<TMutexImpl> guard(&w.Mutex);
            Buckets = w.Buckets;
            WindowValue = w.WindowValue;
            FirstElem = w.FirstElem;
            PeriodStart = w.PeriodStart;
            Length = w.Length;
            MicroSecondsPerBucket = w.MicroSecondsPerBucket;
        }

        TSlidingWindow(TSlidingWindow&&) = default;

        TSlidingWindow& operator=(TSlidingWindow&&) = default;
        TSlidingWindow& operator=(const TSlidingWindow&) = delete;

        // Period of time
        const TDuration& GetDuration() const {
            return Length;
        }

        // Update window with new value and time
        TValueType Update(TValueType val, TInstant t) {
            TGuard<TMutexImpl> guard(&Mutex);
            AdvanceTime(t);
            UpdateCurrentBucket(val);
            return WindowValue;
        }

        // Update just time, without new values
        TValueType Update(TInstant t) {
            TGuard<TMutexImpl> guard(&Mutex);
            AdvanceTime(t);
            return WindowValue;
        }

        // Get current window value (without updating current time)
        TValueType GetValue() const {
            TGuard<TMutexImpl> guard(&Mutex);
            return WindowValue;
        }

    private:
        void UpdateCurrentBucket(TValueType val) {
            const TSizeType arraySize = Buckets.size();
            const TSizeType pos = (FirstElem + arraySize - 1) % arraySize;
            WindowValue = TOperation::UpdateBucket(WindowValue, Buckets, pos, val);
        }

        void AdvanceTime(const TInstant& time) {
            if (time < PeriodStart + Length) {
                return;
            }

            if (PeriodStart.MicroSeconds() == 0) {
                PeriodStart = time - Length;
                return;
            }

            const TInstant& newPeriodStart = time - Length;
            const ui64 tmDiff = (newPeriodStart - PeriodStart).MicroSeconds();
            const TSizeType bucketsDiff = tmDiff / MicroSecondsPerBucket;
            const TSizeType arraySize = Buckets.size();
            const TSizeType buckets = Min(bucketsDiff, arraySize);

            WindowValue = TOperation::ClearBuckets(WindowValue, Buckets, FirstElem, buckets);
            FirstElem = (FirstElem + buckets) % arraySize;
            PeriodStart += TDuration::MicroSeconds(bucketsDiff * MicroSecondsPerBucket);

            // Check that PeriodStart lags behind newPeriodStart
            // (which is actual, uptodate, precise and equal to time - Length) not more
            // then MicroSecondsPerBucket
            Y_ASSERT(newPeriodStart >= PeriodStart);
            Y_ASSERT((newPeriodStart - PeriodStart).MicroSeconds() <= MicroSecondsPerBucket);
        }


        mutable TMutexImpl Mutex;
        TValueVector Buckets;
        TValueType WindowValue;
        TSizeType FirstElem;
        TInstant PeriodStart;
        TDuration Length;
        ui64 MicroSecondsPerBucket;
    };

}