aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/sliding_window
diff options
context:
space:
mode:
authorDevtools Arcadia <arcadia-devtools@yandex-team.ru>2022-02-07 18:08:42 +0300
committerDevtools Arcadia <arcadia-devtools@mous.vla.yp-c.yandex.net>2022-02-07 18:08:42 +0300
commit1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch)
treee26c9fed0de5d9873cce7e00bc214573dc2195b7 /library/cpp/sliding_window
downloadydb-1110808a9d39d4b808aef724c861a2e1a38d2a69.tar.gz
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'library/cpp/sliding_window')
-rw-r--r--library/cpp/sliding_window/README.md30
-rw-r--r--library/cpp/sliding_window/sliding_window.cpp1
-rw-r--r--library/cpp/sliding_window/sliding_window.h224
-rw-r--r--library/cpp/sliding_window/sliding_window_ut.cpp132
-rw-r--r--library/cpp/sliding_window/ut/ya.make9
-rw-r--r--library/cpp/sliding_window/ya.make10
6 files changed, 406 insertions, 0 deletions
diff --git a/library/cpp/sliding_window/README.md b/library/cpp/sliding_window/README.md
new file mode 100644
index 0000000000..47692da7d5
--- /dev/null
+++ b/library/cpp/sliding_window/README.md
@@ -0,0 +1,30 @@
+# TSlidingWindow - скользящее окно
+
+[TSlidingWindow](/arc/trunk/arcadia/library/cpp/sliding_window/sliding_window.h) - класс скользящего окна, позволяющий поддерживать и обновлять определённое значение (максимум, сумму) в промежутке времени определённой длины. Разбивает общий временной промежуток на маленькие бакеты (число задаётся в конструкторе) и ротирует их, поддерживая значение за окно. Есть возможность также указать мьютекс или спинлок для синхронизации (по умолчанию TFakeMutex). Использование:
+
+```
+// Создаём окно, вычисляющее максимум за последние пять минут, поддерживая 50 бакетов со значениями.
+TSlidingWindow<TMaxOperation<int>> window(TDuration::Minutes(5), 50);
+
+// Загружаем значения в различные моменты времени
+window.Update(42, TInstant::Now());
+
+... // делаем какую-то работу
+int currentMaximum = window.Update(50, TInstant::Now());
+
+... // делаем ещё что-то
+int currentMaximum = window.Update(25, TInstant::Now());
+
+...
+// Просто получаем значение максимума за последние 5 минут
+int currentMaximum = window.Update(TInstant::Now());
+
+...
+int currentMaximum = window.GetValue(); // получение значения без обновления времени
+```
+
+# Поддерживаемые функции
+
+* `TMaxOperation` - максимум
+* `TMinOperation` - минимум
+* `TSumOperation` - сумма
diff --git a/library/cpp/sliding_window/sliding_window.cpp b/library/cpp/sliding_window/sliding_window.cpp
new file mode 100644
index 0000000000..086cce5d02
--- /dev/null
+++ b/library/cpp/sliding_window/sliding_window.cpp
@@ -0,0 +1 @@
+#include "sliding_window.h"
diff --git a/library/cpp/sliding_window/sliding_window.h b/library/cpp/sliding_window/sliding_window.h
new file mode 100644
index 0000000000..180bdf93d0
--- /dev/null
+++ b/library/cpp/sliding_window/sliding_window.h
@@ -0,0 +1,224 @@
+#pragma once
+
+#include <util/datetime/base.h>
+#include <util/generic/vector.h>
+#include <util/system/guard.h>
+#include <util/system/mutex.h>
+#include <util/system/types.h>
+#include <util/system/yassert.h>
+
+#include <functional>
+#include <limits>
+
+namespace NSlidingWindow {
+ namespace NPrivate {
+ template <class TValueType_, class TCmp, TValueType_ initialValue> // std::less for max, std::greater for min
+ struct TMinMaxOperationImpl {
+ using TValueType = TValueType_;
+ using TValueVector = TVector<TValueType>;
+
+ static constexpr TValueType InitialValue() {
+ return initialValue;
+ }
+
+ // Updates value in current bucket and returns window value
+ static TValueType UpdateBucket(TValueType windowValue, TValueVector& buckets, size_t index, TValueType newVal) {
+ Y_ASSERT(index < buckets.size());
+ TCmp cmp;
+ TValueType& curVal = buckets[index];
+ if (cmp(curVal, newVal)) {
+ curVal = newVal;
+ if (cmp(windowValue, newVal)) {
+ windowValue = newVal;
+ }
+ }
+ return windowValue;
+ }
+
+ static TValueType ClearBuckets(TValueType windowValue, TValueVector& buckets, const size_t firstElemIndex, const size_t bucketsToClear) {
+ Y_ASSERT(!buckets.empty());
+ Y_ASSERT(firstElemIndex < buckets.size());
+ Y_ASSERT(bucketsToClear <= buckets.size());
+ TCmp cmp;
+
+ bool needRecalc = false;
+ size_t current = firstElemIndex;
+ const size_t arraySize = buckets.size();
+ for (size_t i = 0; i < bucketsToClear; ++i) {
+ TValueType& curVal = buckets[current];
+ if (!needRecalc && windowValue == curVal) {
+ needRecalc = true;
+ }
+ curVal = InitialValue();
+ current = (current + 1) % arraySize;
+ }
+ if (needRecalc) {
+ windowValue = InitialValue();
+ for (size_t i = 0; i < firstElemIndex; ++i) {
+ const TValueType val = buckets[i];
+ if (cmp(windowValue, val)) {
+ windowValue = val;
+ }
+ }
+ for (size_t i = current, size = buckets.size(); i < size; ++i) {
+ const TValueType val = buckets[i];
+ if (cmp(windowValue, val)) {
+ windowValue = val;
+ }
+ }
+ }
+ return windowValue;
+ }
+ };
+
+ }
+
+ template <class TValueType>
+ using TMaxOperation = NPrivate::TMinMaxOperationImpl<TValueType, std::less<TValueType>, std::numeric_limits<TValueType>::min()>;
+
+ template <class TValueType>
+ using TMinOperation = NPrivate::TMinMaxOperationImpl<TValueType, std::greater<TValueType>, std::numeric_limits<TValueType>::max()>;
+
+ template <class TValueType_>
+ struct TSumOperation {
+ using TValueType = TValueType_;
+ using TValueVector = TVector<TValueType>;
+
+ static constexpr TValueType InitialValue() {
+ return TValueType(); // zero
+ }
+
+ // Updates value in current bucket and returns window value
+ static TValueType UpdateBucket(TValueType windowValue, TValueVector& buckets, size_t index, TValueType newVal) {
+ Y_ASSERT(index < buckets.size());
+ buckets[index] += newVal;
+ windowValue += newVal;
+ return windowValue;
+ }
+
+ static TValueType ClearBuckets(TValueType windowValue, TValueVector& buckets, size_t firstElemIndex, size_t bucketsToClear) {
+ Y_ASSERT(!buckets.empty());
+ Y_ASSERT(firstElemIndex < buckets.size());
+ Y_ASSERT(bucketsToClear <= buckets.size());
+
+ const size_t arraySize = buckets.size();
+ for (size_t i = 0; i < bucketsToClear; ++i) {
+ TValueType& curVal = buckets[firstElemIndex];
+ windowValue -= curVal;
+ curVal = InitialValue();
+ firstElemIndex = (firstElemIndex + 1) % arraySize;
+ }
+ return windowValue;
+ }
+ };
+
+ /////////////////////////////////////////////////////////////////////////////////////////
+ // TSlidingWindow
+ /////////////////////////////////////////////////////////////////////////////////////////
+ template <class TOperation, class TMutexImpl = TFakeMutex>
+ class TSlidingWindow {
+ public:
+ using TValueType = typename TOperation::TValueType;
+ using TValueVector = TVector<TValueType>;
+ using TSizeType = typename TValueVector::size_type;
+
+ public:
+ TSlidingWindow(const TDuration& length, TSizeType partsNum)
+ : Mutex()
+ , Buckets(partsNum, TOperation::InitialValue()) // vector of size partsNum initialized with initial value
+ , WindowValue(TOperation::InitialValue())
+ , FirstElem(0)
+ , PeriodStart()
+ , Length(length)
+ , MicroSecondsPerBucket(length.MicroSeconds() / partsNum)
+ {
+ }
+
+ TSlidingWindow(const TSlidingWindow& w)
+ : Mutex()
+ {
+ TGuard<TMutexImpl> guard(&w.Mutex);
+ Buckets = w.Buckets;
+ WindowValue = w.WindowValue;
+ FirstElem = w.FirstElem;
+ PeriodStart = w.PeriodStart;
+ Length = w.Length;
+ MicroSecondsPerBucket = w.MicroSecondsPerBucket;
+ }
+
+ TSlidingWindow(TSlidingWindow&&) = default;
+
+ TSlidingWindow& operator=(TSlidingWindow&&) = default;
+ TSlidingWindow& operator=(const TSlidingWindow&) = delete;
+
+ // Period of time
+ const TDuration& GetDuration() const {
+ return Length;
+ }
+
+ // Update window with new value and time
+ TValueType Update(TValueType val, TInstant t) {
+ TGuard<TMutexImpl> guard(&Mutex);
+ AdvanceTime(t);
+ UpdateCurrentBucket(val);
+ return WindowValue;
+ }
+
+ // Update just time, without new values
+ TValueType Update(TInstant t) {
+ TGuard<TMutexImpl> guard(&Mutex);
+ AdvanceTime(t);
+ return WindowValue;
+ }
+
+ // Get current window value (without updating current time)
+ TValueType GetValue() const {
+ TGuard<TMutexImpl> guard(&Mutex);
+ return WindowValue;
+ }
+
+ private:
+ void UpdateCurrentBucket(TValueType val) {
+ const TSizeType arraySize = Buckets.size();
+ const TSizeType pos = (FirstElem + arraySize - 1) % arraySize;
+ WindowValue = TOperation::UpdateBucket(WindowValue, Buckets, pos, val);
+ }
+
+ void AdvanceTime(const TInstant& time) {
+ if (time < PeriodStart + Length) {
+ return;
+ }
+
+ if (PeriodStart.MicroSeconds() == 0) {
+ PeriodStart = time - Length;
+ return;
+ }
+
+ const TInstant& newPeriodStart = time - Length;
+ const ui64 tmDiff = (newPeriodStart - PeriodStart).MicroSeconds();
+ const TSizeType bucketsDiff = tmDiff / MicroSecondsPerBucket;
+ const TSizeType arraySize = Buckets.size();
+ const TSizeType buckets = Min(bucketsDiff, arraySize);
+
+ WindowValue = TOperation::ClearBuckets(WindowValue, Buckets, FirstElem, buckets);
+ FirstElem = (FirstElem + buckets) % arraySize;
+ PeriodStart += TDuration::MicroSeconds(bucketsDiff * MicroSecondsPerBucket);
+
+ // Check that PeriodStart lags behind newPeriodStart
+ // (which is actual, uptodate, precise and equal to time - Length) not more
+ // then MicroSecondsPerBucket
+ Y_ASSERT(newPeriodStart >= PeriodStart);
+ Y_ASSERT((newPeriodStart - PeriodStart).MicroSeconds() <= MicroSecondsPerBucket);
+ }
+
+
+ mutable TMutexImpl Mutex;
+ TValueVector Buckets;
+ TValueType WindowValue;
+ TSizeType FirstElem;
+ TInstant PeriodStart;
+ TDuration Length;
+ ui64 MicroSecondsPerBucket;
+ };
+
+}
diff --git a/library/cpp/sliding_window/sliding_window_ut.cpp b/library/cpp/sliding_window/sliding_window_ut.cpp
new file mode 100644
index 0000000000..1e7343a8d3
--- /dev/null
+++ b/library/cpp/sliding_window/sliding_window_ut.cpp
@@ -0,0 +1,132 @@
+#include "sliding_window.h"
+
+#include <library/cpp/testing/unittest/registar.h>
+
+using namespace NSlidingWindow;
+
+Y_UNIT_TEST_SUITE(TSlidingWindowTest) {
+ Y_UNIT_TEST(TestSlidingWindowMax) {
+ TSlidingWindow<TMaxOperation<unsigned>> w(TDuration::Minutes(5), 5);
+ TInstant start = TInstant::MicroSeconds(TDuration::Hours(1).MicroSeconds());
+ TInstant now = start;
+ w.Update(5, start); // ~ ~ ~ ~ 5
+ UNIT_ASSERT_VALUES_EQUAL(w.GetValue(), 5); // ^
+ now += TDuration::Minutes(1) + TDuration::Seconds(1);
+ w.Update(5, now); // 5 ~ ~ ~ 5
+ UNIT_ASSERT_VALUES_EQUAL(w.GetValue(), 5); // ^
+ now += TDuration::Minutes(1);
+ w.Update(3, now); // 5 3 ~ ~ 5
+ UNIT_ASSERT_VALUES_EQUAL(w.GetValue(), 5); // ^
+ now += TDuration::Minutes(3);
+ w.Update(2, now); // 5 3 ~ ~ 2
+ UNIT_ASSERT_VALUES_EQUAL(w.GetValue(), 5); // ^
+ now += TDuration::Minutes(1);
+ w.Update(2, now); // 2 3 ~ ~ 2
+ UNIT_ASSERT_VALUES_EQUAL(w.GetValue(), 3); // ^
+ now += TDuration::Minutes(1);
+ w.Update(2, now); // 2 2 ~ ~ 2
+ UNIT_ASSERT_VALUES_EQUAL(w.GetValue(), 2); // ^
+ now += TDuration::Minutes(5);
+ w.Update(1, now); // ~ 1 ~ ~ ~
+ UNIT_ASSERT_VALUES_EQUAL(w.GetValue(), 1); // ^
+
+ // update current bucket
+ w.Update(2, now); // ~ 2 ~ ~ ~
+ UNIT_ASSERT_VALUES_EQUAL(w.GetValue(), 2); // ^
+
+ w.Update(1, now + TDuration::Seconds(30)); // ~ 2 ~ ~ ~
+ UNIT_ASSERT_VALUES_EQUAL(w.GetValue(), 2); // ^
+
+ // test idle
+ now += TDuration::Minutes(1);
+ w.Update(now); // ~ 2 ~ ~ ~
+ UNIT_ASSERT_VALUES_EQUAL(w.GetValue(), 2); // ^
+
+ now += TDuration::Minutes(5); // ~ ~ ~ ~ ~
+ UNIT_ASSERT_VALUES_EQUAL(w.Update(now), 0);
+ }
+
+ Y_UNIT_TEST(TestSlidingWindowMin) {
+ TSlidingWindow<TMinOperation<unsigned>> w(TDuration::Minutes(5), 5);
+ TInstant start = TInstant::MicroSeconds(TDuration::Hours(1).MicroSeconds());
+ TInstant now = start;
+ w.Update(5, start); // ~ ~ ~ ~ 5
+ UNIT_ASSERT_VALUES_EQUAL(w.GetValue(), 5); // ^
+ now += TDuration::Minutes(1) + TDuration::Seconds(1);
+ w.Update(5, now); // 5 ~ ~ ~ 5
+ UNIT_ASSERT_VALUES_EQUAL(w.GetValue(), 5); // ^
+ now += TDuration::Minutes(1);
+ w.Update(7, now); // 5 7 ~ ~ 5
+ UNIT_ASSERT_VALUES_EQUAL(w.GetValue(), 5); // ^
+ now += TDuration::Minutes(3);
+ w.Update(8, now); // 5 7 ~ ~ 8
+ UNIT_ASSERT_VALUES_EQUAL(w.GetValue(), 5); // ^
+ now += TDuration::Minutes(1);
+ w.Update(8, now); // 8 7 ~ ~ 8
+ UNIT_ASSERT_VALUES_EQUAL(w.GetValue(), 7); // ^
+ now += TDuration::Minutes(1);
+ w.Update(8, now); // 8 8 ~ ~ 8
+ UNIT_ASSERT_VALUES_EQUAL(w.GetValue(), 8); // ^
+ now += TDuration::Minutes(5);
+ w.Update(6, now); // ~ 6 ~ ~ ~
+ UNIT_ASSERT_VALUES_EQUAL(w.GetValue(), 6); // ^
+
+ // update current bucket
+ w.Update(5, now); // ~ 5 ~ ~ ~
+ UNIT_ASSERT_VALUES_EQUAL(w.GetValue(), 5); // ^
+
+ w.Update(6, now + TDuration::Seconds(30)); // ~ 5 ~ ~ ~
+ UNIT_ASSERT_VALUES_EQUAL(w.GetValue(), 5); // ^
+
+ // test idle
+ now += TDuration::Minutes(1);
+ w.Update(now); // ~ 5 ~ ~ ~
+ UNIT_ASSERT_VALUES_EQUAL(w.GetValue(), 5); // ^
+
+ now += TDuration::Minutes(5); // ~ ~ ~ ~ ~
+ UNIT_ASSERT_VALUES_EQUAL(w.Update(now), std::numeric_limits<unsigned>::max());
+ }
+
+ Y_UNIT_TEST(TestSlidingWindowSum) {
+ TSlidingWindow<TSumOperation<unsigned>> w(TDuration::Minutes(5), 5);
+ UNIT_ASSERT_VALUES_EQUAL(w.GetValue(), 0); // current sum
+
+ TInstant start = TInstant::MicroSeconds(TDuration::Hours(1).MicroSeconds());
+ TInstant now = start;
+ w.Update(5, start); // 0 0 0 0 5
+ UNIT_ASSERT_VALUES_EQUAL(w.GetValue(), 5); // ^
+ now += TDuration::Minutes(1) + TDuration::Seconds(1);
+ w.Update(5, now); // 5 0 0 0 5
+ UNIT_ASSERT_VALUES_EQUAL(w.GetValue(), 10); // ^
+ now += TDuration::Minutes(1);
+ w.Update(3, now); // 5 3 0 0 5
+ UNIT_ASSERT_VALUES_EQUAL(w.GetValue(), 13); // ^
+ now += TDuration::Minutes(3);
+ w.Update(2, now); // 5 3 0 0 2
+ UNIT_ASSERT_VALUES_EQUAL(w.GetValue(), 10); // ^
+ now += TDuration::Minutes(1);
+ w.Update(2, now); // 2 3 0 0 2
+ UNIT_ASSERT_VALUES_EQUAL(w.GetValue(), 7); // ^
+ now += TDuration::Minutes(1);
+ w.Update(2, now); // 2 2 0 0 2
+ UNIT_ASSERT_VALUES_EQUAL(w.GetValue(), 6); // ^
+ now += TDuration::Minutes(5);
+ w.Update(1, now); // 0 1 0 0 0
+ UNIT_ASSERT_VALUES_EQUAL(w.GetValue(), 1); // ^
+
+ // update current bucket
+ w.Update(2, now); // 0 3 0 0 0
+ UNIT_ASSERT_VALUES_EQUAL(w.GetValue(), 3); // ^
+
+ w.Update(1, now + TDuration::Seconds(30)); // 0 4 0 0 0
+ UNIT_ASSERT_VALUES_EQUAL(w.GetValue(), 4); // ^
+
+ // test idle
+ now += TDuration::Minutes(1);
+ w.Update(now); // 0 4 0 0 0
+ UNIT_ASSERT_VALUES_EQUAL(w.GetValue(), 4); // ^
+
+ now += TDuration::Minutes(5); // 0 0 0 0 0
+ UNIT_ASSERT_VALUES_EQUAL(w.Update(now), 0);
+ }
+}
diff --git a/library/cpp/sliding_window/ut/ya.make b/library/cpp/sliding_window/ut/ya.make
new file mode 100644
index 0000000000..3839a8dadc
--- /dev/null
+++ b/library/cpp/sliding_window/ut/ya.make
@@ -0,0 +1,9 @@
+UNITTEST_FOR(library/cpp/sliding_window)
+
+OWNER(g:kikimr)
+
+SRCS(
+ sliding_window_ut.cpp
+)
+
+END()
diff --git a/library/cpp/sliding_window/ya.make b/library/cpp/sliding_window/ya.make
new file mode 100644
index 0000000000..79aeaa06bb
--- /dev/null
+++ b/library/cpp/sliding_window/ya.make
@@ -0,0 +1,10 @@
+LIBRARY()
+
+OWNER(g:kikimr)
+
+SRCS(
+ sliding_window.cpp
+ sliding_window.h
+)
+
+END()