diff options
author | Devtools Arcadia <arcadia-devtools@yandex-team.ru> | 2022-02-07 18:08:42 +0300 |
---|---|---|
committer | Devtools Arcadia <arcadia-devtools@mous.vla.yp-c.yandex.net> | 2022-02-07 18:08:42 +0300 |
commit | 1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch) | |
tree | e26c9fed0de5d9873cce7e00bc214573dc2195b7 /library/cpp/sliding_window | |
download | ydb-1110808a9d39d4b808aef724c861a2e1a38d2a69.tar.gz |
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'library/cpp/sliding_window')
-rw-r--r-- | library/cpp/sliding_window/README.md | 30 | ||||
-rw-r--r-- | library/cpp/sliding_window/sliding_window.cpp | 1 | ||||
-rw-r--r-- | library/cpp/sliding_window/sliding_window.h | 224 | ||||
-rw-r--r-- | library/cpp/sliding_window/sliding_window_ut.cpp | 132 | ||||
-rw-r--r-- | library/cpp/sliding_window/ut/ya.make | 9 | ||||
-rw-r--r-- | library/cpp/sliding_window/ya.make | 10 |
6 files changed, 406 insertions, 0 deletions
diff --git a/library/cpp/sliding_window/README.md b/library/cpp/sliding_window/README.md new file mode 100644 index 0000000000..47692da7d5 --- /dev/null +++ b/library/cpp/sliding_window/README.md @@ -0,0 +1,30 @@ +# TSlidingWindow - скользящее окно + +[TSlidingWindow](/arc/trunk/arcadia/library/cpp/sliding_window/sliding_window.h) - класс скользящего окна, позволяющий поддерживать и обновлять определённое значение (максимум, сумму) в промежутке времени определённой длины. Разбивает общий временной промежуток на маленькие бакеты (число задаётся в конструкторе) и ротирует их, поддерживая значение за окно. Есть возможность также указать мьютекс или спинлок для синхронизации (по умолчанию TFakeMutex). Использование: + +``` +// Создаём окно, вычисляющее максимум за последние пять минут, поддерживая 50 бакетов со значениями. +TSlidingWindow<TMaxOperation<int>> window(TDuration::Minutes(5), 50); + +// Загружаем значения в различные моменты времени +window.Update(42, TInstant::Now()); + +... // делаем какую-то работу +int currentMaximum = window.Update(50, TInstant::Now()); + +... // делаем ещё что-то +int currentMaximum = window.Update(25, TInstant::Now()); + +... +// Просто получаем значение максимума за последние 5 минут +int currentMaximum = window.Update(TInstant::Now()); + +... +int currentMaximum = window.GetValue(); // получение значения без обновления времени +``` + +# Поддерживаемые функции + +* `TMaxOperation` - максимум +* `TMinOperation` - минимум +* `TSumOperation` - сумма diff --git a/library/cpp/sliding_window/sliding_window.cpp b/library/cpp/sliding_window/sliding_window.cpp new file mode 100644 index 0000000000..086cce5d02 --- /dev/null +++ b/library/cpp/sliding_window/sliding_window.cpp @@ -0,0 +1 @@ +#include "sliding_window.h" diff --git a/library/cpp/sliding_window/sliding_window.h b/library/cpp/sliding_window/sliding_window.h new file mode 100644 index 0000000000..180bdf93d0 --- /dev/null +++ b/library/cpp/sliding_window/sliding_window.h @@ -0,0 +1,224 @@ +#pragma once + +#include <util/datetime/base.h> +#include <util/generic/vector.h> +#include <util/system/guard.h> +#include <util/system/mutex.h> +#include <util/system/types.h> +#include <util/system/yassert.h> + +#include <functional> +#include <limits> + +namespace NSlidingWindow { + namespace NPrivate { + template <class TValueType_, class TCmp, TValueType_ initialValue> // std::less for max, std::greater for min + struct TMinMaxOperationImpl { + using TValueType = TValueType_; + using TValueVector = TVector<TValueType>; + + static constexpr TValueType InitialValue() { + return initialValue; + } + + // Updates value in current bucket and returns window value + static TValueType UpdateBucket(TValueType windowValue, TValueVector& buckets, size_t index, TValueType newVal) { + Y_ASSERT(index < buckets.size()); + TCmp cmp; + TValueType& curVal = buckets[index]; + if (cmp(curVal, newVal)) { + curVal = newVal; + if (cmp(windowValue, newVal)) { + windowValue = newVal; + } + } + return windowValue; + } + + static TValueType ClearBuckets(TValueType windowValue, TValueVector& buckets, const size_t firstElemIndex, const size_t bucketsToClear) { + Y_ASSERT(!buckets.empty()); + Y_ASSERT(firstElemIndex < buckets.size()); + Y_ASSERT(bucketsToClear <= buckets.size()); + TCmp cmp; + + bool needRecalc = false; + size_t current = firstElemIndex; + const size_t arraySize = buckets.size(); + for (size_t i = 0; i < bucketsToClear; ++i) { + TValueType& curVal = buckets[current]; + if (!needRecalc && windowValue == curVal) { + needRecalc = true; + } + curVal = InitialValue(); + current = (current + 1) % arraySize; + } + if (needRecalc) { + windowValue = InitialValue(); + for (size_t i = 0; i < firstElemIndex; ++i) { + const TValueType val = buckets[i]; + if (cmp(windowValue, val)) { + windowValue = val; + } + } + for (size_t i = current, size = buckets.size(); i < size; ++i) { + const TValueType val = buckets[i]; + if (cmp(windowValue, val)) { + windowValue = val; + } + } + } + return windowValue; + } + }; + + } + + template <class TValueType> + using TMaxOperation = NPrivate::TMinMaxOperationImpl<TValueType, std::less<TValueType>, std::numeric_limits<TValueType>::min()>; + + template <class TValueType> + using TMinOperation = NPrivate::TMinMaxOperationImpl<TValueType, std::greater<TValueType>, std::numeric_limits<TValueType>::max()>; + + template <class TValueType_> + struct TSumOperation { + using TValueType = TValueType_; + using TValueVector = TVector<TValueType>; + + static constexpr TValueType InitialValue() { + return TValueType(); // zero + } + + // Updates value in current bucket and returns window value + static TValueType UpdateBucket(TValueType windowValue, TValueVector& buckets, size_t index, TValueType newVal) { + Y_ASSERT(index < buckets.size()); + buckets[index] += newVal; + windowValue += newVal; + return windowValue; + } + + static TValueType ClearBuckets(TValueType windowValue, TValueVector& buckets, size_t firstElemIndex, size_t bucketsToClear) { + Y_ASSERT(!buckets.empty()); + Y_ASSERT(firstElemIndex < buckets.size()); + Y_ASSERT(bucketsToClear <= buckets.size()); + + const size_t arraySize = buckets.size(); + for (size_t i = 0; i < bucketsToClear; ++i) { + TValueType& curVal = buckets[firstElemIndex]; + windowValue -= curVal; + curVal = InitialValue(); + firstElemIndex = (firstElemIndex + 1) % arraySize; + } + return windowValue; + } + }; + + ///////////////////////////////////////////////////////////////////////////////////////// + // TSlidingWindow + ///////////////////////////////////////////////////////////////////////////////////////// + template <class TOperation, class TMutexImpl = TFakeMutex> + class TSlidingWindow { + public: + using TValueType = typename TOperation::TValueType; + using TValueVector = TVector<TValueType>; + using TSizeType = typename TValueVector::size_type; + + public: + TSlidingWindow(const TDuration& length, TSizeType partsNum) + : Mutex() + , Buckets(partsNum, TOperation::InitialValue()) // vector of size partsNum initialized with initial value + , WindowValue(TOperation::InitialValue()) + , FirstElem(0) + , PeriodStart() + , Length(length) + , MicroSecondsPerBucket(length.MicroSeconds() / partsNum) + { + } + + TSlidingWindow(const TSlidingWindow& w) + : Mutex() + { + TGuard<TMutexImpl> guard(&w.Mutex); + Buckets = w.Buckets; + WindowValue = w.WindowValue; + FirstElem = w.FirstElem; + PeriodStart = w.PeriodStart; + Length = w.Length; + MicroSecondsPerBucket = w.MicroSecondsPerBucket; + } + + TSlidingWindow(TSlidingWindow&&) = default; + + TSlidingWindow& operator=(TSlidingWindow&&) = default; + TSlidingWindow& operator=(const TSlidingWindow&) = delete; + + // Period of time + const TDuration& GetDuration() const { + return Length; + } + + // Update window with new value and time + TValueType Update(TValueType val, TInstant t) { + TGuard<TMutexImpl> guard(&Mutex); + AdvanceTime(t); + UpdateCurrentBucket(val); + return WindowValue; + } + + // Update just time, without new values + TValueType Update(TInstant t) { + TGuard<TMutexImpl> guard(&Mutex); + AdvanceTime(t); + return WindowValue; + } + + // Get current window value (without updating current time) + TValueType GetValue() const { + TGuard<TMutexImpl> guard(&Mutex); + return WindowValue; + } + + private: + void UpdateCurrentBucket(TValueType val) { + const TSizeType arraySize = Buckets.size(); + const TSizeType pos = (FirstElem + arraySize - 1) % arraySize; + WindowValue = TOperation::UpdateBucket(WindowValue, Buckets, pos, val); + } + + void AdvanceTime(const TInstant& time) { + if (time < PeriodStart + Length) { + return; + } + + if (PeriodStart.MicroSeconds() == 0) { + PeriodStart = time - Length; + return; + } + + const TInstant& newPeriodStart = time - Length; + const ui64 tmDiff = (newPeriodStart - PeriodStart).MicroSeconds(); + const TSizeType bucketsDiff = tmDiff / MicroSecondsPerBucket; + const TSizeType arraySize = Buckets.size(); + const TSizeType buckets = Min(bucketsDiff, arraySize); + + WindowValue = TOperation::ClearBuckets(WindowValue, Buckets, FirstElem, buckets); + FirstElem = (FirstElem + buckets) % arraySize; + PeriodStart += TDuration::MicroSeconds(bucketsDiff * MicroSecondsPerBucket); + + // Check that PeriodStart lags behind newPeriodStart + // (which is actual, uptodate, precise and equal to time - Length) not more + // then MicroSecondsPerBucket + Y_ASSERT(newPeriodStart >= PeriodStart); + Y_ASSERT((newPeriodStart - PeriodStart).MicroSeconds() <= MicroSecondsPerBucket); + } + + + mutable TMutexImpl Mutex; + TValueVector Buckets; + TValueType WindowValue; + TSizeType FirstElem; + TInstant PeriodStart; + TDuration Length; + ui64 MicroSecondsPerBucket; + }; + +} diff --git a/library/cpp/sliding_window/sliding_window_ut.cpp b/library/cpp/sliding_window/sliding_window_ut.cpp new file mode 100644 index 0000000000..1e7343a8d3 --- /dev/null +++ b/library/cpp/sliding_window/sliding_window_ut.cpp @@ -0,0 +1,132 @@ +#include "sliding_window.h" + +#include <library/cpp/testing/unittest/registar.h> + +using namespace NSlidingWindow; + +Y_UNIT_TEST_SUITE(TSlidingWindowTest) { + Y_UNIT_TEST(TestSlidingWindowMax) { + TSlidingWindow<TMaxOperation<unsigned>> w(TDuration::Minutes(5), 5); + TInstant start = TInstant::MicroSeconds(TDuration::Hours(1).MicroSeconds()); + TInstant now = start; + w.Update(5, start); // ~ ~ ~ ~ 5 + UNIT_ASSERT_VALUES_EQUAL(w.GetValue(), 5); // ^ + now += TDuration::Minutes(1) + TDuration::Seconds(1); + w.Update(5, now); // 5 ~ ~ ~ 5 + UNIT_ASSERT_VALUES_EQUAL(w.GetValue(), 5); // ^ + now += TDuration::Minutes(1); + w.Update(3, now); // 5 3 ~ ~ 5 + UNIT_ASSERT_VALUES_EQUAL(w.GetValue(), 5); // ^ + now += TDuration::Minutes(3); + w.Update(2, now); // 5 3 ~ ~ 2 + UNIT_ASSERT_VALUES_EQUAL(w.GetValue(), 5); // ^ + now += TDuration::Minutes(1); + w.Update(2, now); // 2 3 ~ ~ 2 + UNIT_ASSERT_VALUES_EQUAL(w.GetValue(), 3); // ^ + now += TDuration::Minutes(1); + w.Update(2, now); // 2 2 ~ ~ 2 + UNIT_ASSERT_VALUES_EQUAL(w.GetValue(), 2); // ^ + now += TDuration::Minutes(5); + w.Update(1, now); // ~ 1 ~ ~ ~ + UNIT_ASSERT_VALUES_EQUAL(w.GetValue(), 1); // ^ + + // update current bucket + w.Update(2, now); // ~ 2 ~ ~ ~ + UNIT_ASSERT_VALUES_EQUAL(w.GetValue(), 2); // ^ + + w.Update(1, now + TDuration::Seconds(30)); // ~ 2 ~ ~ ~ + UNIT_ASSERT_VALUES_EQUAL(w.GetValue(), 2); // ^ + + // test idle + now += TDuration::Minutes(1); + w.Update(now); // ~ 2 ~ ~ ~ + UNIT_ASSERT_VALUES_EQUAL(w.GetValue(), 2); // ^ + + now += TDuration::Minutes(5); // ~ ~ ~ ~ ~ + UNIT_ASSERT_VALUES_EQUAL(w.Update(now), 0); + } + + Y_UNIT_TEST(TestSlidingWindowMin) { + TSlidingWindow<TMinOperation<unsigned>> w(TDuration::Minutes(5), 5); + TInstant start = TInstant::MicroSeconds(TDuration::Hours(1).MicroSeconds()); + TInstant now = start; + w.Update(5, start); // ~ ~ ~ ~ 5 + UNIT_ASSERT_VALUES_EQUAL(w.GetValue(), 5); // ^ + now += TDuration::Minutes(1) + TDuration::Seconds(1); + w.Update(5, now); // 5 ~ ~ ~ 5 + UNIT_ASSERT_VALUES_EQUAL(w.GetValue(), 5); // ^ + now += TDuration::Minutes(1); + w.Update(7, now); // 5 7 ~ ~ 5 + UNIT_ASSERT_VALUES_EQUAL(w.GetValue(), 5); // ^ + now += TDuration::Minutes(3); + w.Update(8, now); // 5 7 ~ ~ 8 + UNIT_ASSERT_VALUES_EQUAL(w.GetValue(), 5); // ^ + now += TDuration::Minutes(1); + w.Update(8, now); // 8 7 ~ ~ 8 + UNIT_ASSERT_VALUES_EQUAL(w.GetValue(), 7); // ^ + now += TDuration::Minutes(1); + w.Update(8, now); // 8 8 ~ ~ 8 + UNIT_ASSERT_VALUES_EQUAL(w.GetValue(), 8); // ^ + now += TDuration::Minutes(5); + w.Update(6, now); // ~ 6 ~ ~ ~ + UNIT_ASSERT_VALUES_EQUAL(w.GetValue(), 6); // ^ + + // update current bucket + w.Update(5, now); // ~ 5 ~ ~ ~ + UNIT_ASSERT_VALUES_EQUAL(w.GetValue(), 5); // ^ + + w.Update(6, now + TDuration::Seconds(30)); // ~ 5 ~ ~ ~ + UNIT_ASSERT_VALUES_EQUAL(w.GetValue(), 5); // ^ + + // test idle + now += TDuration::Minutes(1); + w.Update(now); // ~ 5 ~ ~ ~ + UNIT_ASSERT_VALUES_EQUAL(w.GetValue(), 5); // ^ + + now += TDuration::Minutes(5); // ~ ~ ~ ~ ~ + UNIT_ASSERT_VALUES_EQUAL(w.Update(now), std::numeric_limits<unsigned>::max()); + } + + Y_UNIT_TEST(TestSlidingWindowSum) { + TSlidingWindow<TSumOperation<unsigned>> w(TDuration::Minutes(5), 5); + UNIT_ASSERT_VALUES_EQUAL(w.GetValue(), 0); // current sum + + TInstant start = TInstant::MicroSeconds(TDuration::Hours(1).MicroSeconds()); + TInstant now = start; + w.Update(5, start); // 0 0 0 0 5 + UNIT_ASSERT_VALUES_EQUAL(w.GetValue(), 5); // ^ + now += TDuration::Minutes(1) + TDuration::Seconds(1); + w.Update(5, now); // 5 0 0 0 5 + UNIT_ASSERT_VALUES_EQUAL(w.GetValue(), 10); // ^ + now += TDuration::Minutes(1); + w.Update(3, now); // 5 3 0 0 5 + UNIT_ASSERT_VALUES_EQUAL(w.GetValue(), 13); // ^ + now += TDuration::Minutes(3); + w.Update(2, now); // 5 3 0 0 2 + UNIT_ASSERT_VALUES_EQUAL(w.GetValue(), 10); // ^ + now += TDuration::Minutes(1); + w.Update(2, now); // 2 3 0 0 2 + UNIT_ASSERT_VALUES_EQUAL(w.GetValue(), 7); // ^ + now += TDuration::Minutes(1); + w.Update(2, now); // 2 2 0 0 2 + UNIT_ASSERT_VALUES_EQUAL(w.GetValue(), 6); // ^ + now += TDuration::Minutes(5); + w.Update(1, now); // 0 1 0 0 0 + UNIT_ASSERT_VALUES_EQUAL(w.GetValue(), 1); // ^ + + // update current bucket + w.Update(2, now); // 0 3 0 0 0 + UNIT_ASSERT_VALUES_EQUAL(w.GetValue(), 3); // ^ + + w.Update(1, now + TDuration::Seconds(30)); // 0 4 0 0 0 + UNIT_ASSERT_VALUES_EQUAL(w.GetValue(), 4); // ^ + + // test idle + now += TDuration::Minutes(1); + w.Update(now); // 0 4 0 0 0 + UNIT_ASSERT_VALUES_EQUAL(w.GetValue(), 4); // ^ + + now += TDuration::Minutes(5); // 0 0 0 0 0 + UNIT_ASSERT_VALUES_EQUAL(w.Update(now), 0); + } +} diff --git a/library/cpp/sliding_window/ut/ya.make b/library/cpp/sliding_window/ut/ya.make new file mode 100644 index 0000000000..3839a8dadc --- /dev/null +++ b/library/cpp/sliding_window/ut/ya.make @@ -0,0 +1,9 @@ +UNITTEST_FOR(library/cpp/sliding_window) + +OWNER(g:kikimr) + +SRCS( + sliding_window_ut.cpp +) + +END() diff --git a/library/cpp/sliding_window/ya.make b/library/cpp/sliding_window/ya.make new file mode 100644 index 0000000000..79aeaa06bb --- /dev/null +++ b/library/cpp/sliding_window/ya.make @@ -0,0 +1,10 @@ +LIBRARY() + +OWNER(g:kikimr) + +SRCS( + sliding_window.cpp + sliding_window.h +) + +END() |