diff options
author | Devtools Arcadia <arcadia-devtools@yandex-team.ru> | 2022-02-07 18:08:42 +0300 |
---|---|---|
committer | Devtools Arcadia <arcadia-devtools@mous.vla.yp-c.yandex.net> | 2022-02-07 18:08:42 +0300 |
commit | 1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch) | |
tree | e26c9fed0de5d9873cce7e00bc214573dc2195b7 /library/cpp/bucket_quoter | |
download | ydb-1110808a9d39d4b808aef724c861a2e1a38d2a69.tar.gz |
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'library/cpp/bucket_quoter')
-rw-r--r-- | library/cpp/bucket_quoter/bucket_quoter.cpp | 1 | ||||
-rw-r--r-- | library/cpp/bucket_quoter/bucket_quoter.h | 281 | ||||
-rw-r--r-- | library/cpp/bucket_quoter/ut/README.md | 20 | ||||
-rw-r--r-- | library/cpp/bucket_quoter/ut/main.cpp | 44 | ||||
-rw-r--r-- | library/cpp/bucket_quoter/ut/test_namespace.cpp | 13 | ||||
-rw-r--r-- | library/cpp/bucket_quoter/ut/test_namespace.h | 83 | ||||
-rw-r--r-- | library/cpp/bucket_quoter/ut/ya.make | 18 | ||||
-rw-r--r-- | library/cpp/bucket_quoter/ya.make | 11 |
8 files changed, 471 insertions, 0 deletions
diff --git a/library/cpp/bucket_quoter/bucket_quoter.cpp b/library/cpp/bucket_quoter/bucket_quoter.cpp new file mode 100644 index 0000000000..e7f8b1a869 --- /dev/null +++ b/library/cpp/bucket_quoter/bucket_quoter.cpp @@ -0,0 +1 @@ +#include "bucket_quoter.h" diff --git a/library/cpp/bucket_quoter/bucket_quoter.h b/library/cpp/bucket_quoter/bucket_quoter.h new file mode 100644 index 0000000000..3d92ef8450 --- /dev/null +++ b/library/cpp/bucket_quoter/bucket_quoter.h @@ -0,0 +1,281 @@ +#pragma once + +#include <util/datetime/base.h> +#include <util/system/mutex.h> +#include <util/system/hp_timer.h> + +/* Token bucket. + * Makes flow of *inflow* units per second in average, with up to *capacity* bursts. + * Do not use for STRICT flow control. + */ + +/* samples: create and use quoter sending 1000 bytes per second on average, + with up to 60 seconds quota buildup. + + TBucketQuoter quoter(1000, 60000, NULL, NULL, NULL); + + for (;;) { + T *msg = get_message(); + + quoter.Sleep(); + quoter.Use(msg->GetSize()); + send_message(msg); + } + + ---------------------------- + + TBucketQuoter quoter(1000, 60000, NULL, NULL, NULL); + + for (;;) { + T *msg = get_message(); + + while (! quoter.IsAvail()) { + // do something else + } + + quoter.Use(msg->GetSize()); + send_message(msg); + } + +*/ + +struct TInstantTimerMs { + using TTime = TInstant; + static constexpr ui64 Resolution = 1000ull; // milliseconds + static TTime Now() { + return TInstant::Now(); + } + static ui64 Duration(TTime from, TTime to) { + return (to - from).MilliSeconds(); + } +}; + +struct THPTimerUs { + using TTime = NHPTimer::STime; + static constexpr ui64 Resolution = 1000000ull; // microseconds + static TTime Now() { + NHPTimer::STime ret; + NHPTimer::GetTime(&ret); + return ret; + } + static ui64 Duration(TTime from, TTime to) { + i64 cycles = to - from; + if (cycles > 0) { + return ui64(double(cycles) * double(Resolution) / NHPTimer::GetClockRate()); + } else { + return 0; + } + } +}; + +template <typename StatCounter, typename Lock = TMutex, typename Timer = TInstantTimerMs> +class TBucketQuoter { +public: + using TTime = typename Timer::TTime; + + struct TResult { + i64 Before; + i64 After; + ui64 Seqno; + }; + + /* fixed quota */ + TBucketQuoter(ui64 inflow, ui64 capacity, StatCounter* msgPassed = nullptr, + StatCounter* bucketUnderflows = nullptr, StatCounter* tokensUsed = nullptr, + StatCounter* usecWaited = nullptr, bool fill = false, StatCounter* aggregateInflow = nullptr) + : MsgPassed(msgPassed) + , BucketUnderflows(bucketUnderflows) + , TokensUsed(tokensUsed) + , UsecWaited(usecWaited) + , AggregateInflow(aggregateInflow) + , Bucket(fill ? capacity : 0) + , LastAdd(Timer::Now()) + , InflowTokensPerSecond(&FixedInflow) + , BucketTokensCapacity(&FixedCapacity) + , FixedInflow(inflow) + , FixedCapacity(capacity) + { + /* no-op */ + } + + /* adjustable quotas */ + TBucketQuoter(TAtomic* inflow, TAtomic* capacity, StatCounter* msgPassed = nullptr, + StatCounter* bucketUnderflows = nullptr, StatCounter* tokensUsed = nullptr, + StatCounter* usecWaited = nullptr, bool fill = false, StatCounter* aggregateInflow = nullptr) + : MsgPassed(msgPassed) + , BucketUnderflows(bucketUnderflows) + , TokensUsed(tokensUsed) + , UsecWaited(usecWaited) + , AggregateInflow(aggregateInflow) + , Bucket(fill ? AtomicGet(*capacity) : 0) + , LastAdd(Timer::Now()) + , InflowTokensPerSecond(inflow) + , BucketTokensCapacity(capacity) + { + /* no-op */ + } + + bool IsAvail() { + TGuard<Lock> g(BucketMutex); + FillBucket(); + if (Bucket < 0) { + if (BucketUnderflows) { + (*BucketUnderflows)++; + } + } + return (Bucket >= 0); + } + + bool IsAvail(TResult& res) { + TGuard<Lock> g(BucketMutex); + res.Before = Bucket; + FillBucket(); + res.After = Bucket; + res.Seqno = ++Seqno; + if (Bucket < 0) { + if (BucketUnderflows) { + (*BucketUnderflows)++; + } + } + return (Bucket >= 0); + } + + ui64 GetAvail() { + TGuard<Lock> g(BucketMutex); + FillBucket(); + return Max<i64>(0, Bucket); + } + + ui64 GetAvail(TResult& res) { + TGuard<Lock> g(BucketMutex); + res.Before = Bucket; + FillBucket(); + res.After = Bucket; + res.Seqno = ++Seqno; + return Max<i64>(0, Bucket); + } + + void Use(ui64 tokens, bool sleep = false) { + TGuard<Lock> g(BucketMutex); + UseNoLock(tokens, sleep); + } + + void Use(ui64 tokens, TResult& res, bool sleep = false) { + TGuard<Lock> g(BucketMutex); + res.Before = Bucket; + UseNoLock(tokens, sleep); + res.After = Bucket; + res.Seqno = ++Seqno; + } + + i64 UseAndFill(ui64 tokens) { + TGuard<Lock> g(BucketMutex); + UseNoLock(tokens); + FillBucket(); + return Bucket; + } + + void Add(ui64 tokens) { + TGuard<Lock> g(BucketMutex); + AddNoLock(tokens); + } + + void Add(ui64 tokens, TResult& res) { + TGuard<Lock> g(BucketMutex); + res.Before = Bucket; + AddNoLock(tokens); + res.After = Bucket; + res.Seqno = ++Seqno; + } + + ui32 GetWaitTime() { + TGuard<Lock> g(BucketMutex); + + FillBucket(); + if (Bucket >= 0) { + return 0; + } + + ui32 usec = (-Bucket * 1000000) / (*InflowTokensPerSecond); + return usec; + } + + ui32 GetWaitTime(TResult& res) { + TGuard<Lock> g(BucketMutex); + res.Before = Bucket; + FillBucket(); + res.After = Bucket; + res.Seqno = ++Seqno; + if (Bucket >= 0) { + return 0; + } + ui32 usec = (-Bucket * 1000000) / (*InflowTokensPerSecond); + return usec; + } + + void Sleep() { + while (!IsAvail()) { + ui32 delay = GetWaitTime(); + if (delay != 0) { + usleep(delay); + if (UsecWaited) { + (*UsecWaited) += delay; + } + } + } + } + +private: + void FillBucket() { + TTime now = Timer::Now(); + + ui64 elapsed = Timer::Duration(LastAdd, now); + if (*InflowTokensPerSecond * elapsed >= Timer::Resolution) { + ui64 inflow = *InflowTokensPerSecond * elapsed / Timer::Resolution; + if (AggregateInflow) { + *AggregateInflow += inflow; + } + Bucket += inflow; + if (Bucket > *BucketTokensCapacity) { + Bucket = *BucketTokensCapacity; + } + + LastAdd = now; + } + } + + void UseNoLock(ui64 tokens, bool sleep = false) { + if (sleep) + Sleep(); + Bucket -= tokens; + if (TokensUsed) { + (*TokensUsed) += tokens; + } + if (MsgPassed) { + (*MsgPassed)++; + } + } + + void AddNoLock(ui64 tokens) { + Bucket += tokens; + if (Bucket > *BucketTokensCapacity) { + Bucket = *BucketTokensCapacity; + } + } + + StatCounter* MsgPassed; + StatCounter* BucketUnderflows; + StatCounter* TokensUsed; + StatCounter* UsecWaited; + StatCounter* AggregateInflow; + + i64 Bucket; + TTime LastAdd; + Lock BucketMutex; + ui64 Seqno = 0; + + TAtomic* InflowTokensPerSecond; + TAtomic* BucketTokensCapacity; + TAtomic FixedInflow; + TAtomic FixedCapacity; +}; diff --git a/library/cpp/bucket_quoter/ut/README.md b/library/cpp/bucket_quoter/ut/README.md new file mode 100644 index 0000000000..a0db9f6239 --- /dev/null +++ b/library/cpp/bucket_quoter/ut/README.md @@ -0,0 +1,20 @@ +# Unit test для bucket_quoter + +Этот тест предназначен не для проверки корректности bucket_quoter, а для того, +чтобы показать некоторые его недостатки + +Первый недостаток: При вызове метода FillBucket, вычисляется количество токенов, +которое можно добавить в Bucket за время, прошедшее с последнего добавления. +Если это количество нецелое, то оно округляется вниз. Это может иметь значение для малых RPS. + +Второй недостаток: +При попытке получить время через GetWaitTime, возвращается не ближайшее время, когда станут доступны новые токены. +Возвращаемое время кратно (1 / RPS), выраженному в микросекундах. +Кроме того, из-за округления, метод может вернуть время, в котором новые токены все еще не смогут быть начислены. + +Написанный тест демонстрирует, что данные недостатки могут приводить значительному снижению +количества пропускаемых лимитером запросов(вплоть до двух раз в специальных условиях с искусственным таймером). + +Так же демонстрируется, что при использовании лимитера со стандартным таймером(TInstantTimerMs), RPS тоже может +достаточно далек от заданного. + diff --git a/library/cpp/bucket_quoter/ut/main.cpp b/library/cpp/bucket_quoter/ut/main.cpp new file mode 100644 index 0000000000..9c86bace4e --- /dev/null +++ b/library/cpp/bucket_quoter/ut/main.cpp @@ -0,0 +1,44 @@ +#include <library/cpp/testing/unittest/registar.h> +#include <library/cpp/bucket_quoter/bucket_quoter.h> + +#include "test_namespace.h" + + +using NBucketQuoterTest::TMockTimer; + +#define TEST_RPS_EXACT(THREADS, RPS, EXPECTED_RESULT, TIMER) \ +Y_UNIT_TEST(Test##TIMER##RPS##Rps##THREADS##Threads) { \ + ui32 seconds = 10; \ + ui32 summary_requests = NBucketQuoterTest::Run<TIMER, NBucketQuoterTest::TTestScenario>\ + (THREADS, RPS, seconds);\ + ui32 total_rps = summary_requests / seconds; \ + UNIT_ASSERT_EQUAL(total_rps, EXPECTED_RESULT);\ +} + +#define TEST_RPS_LESS(THREADS, RPS, UPPER_BOUND, TIMER) \ +Y_UNIT_TEST(Test##TIMER##RPS##rps##THREADS##threads) { \ + ui32 seconds = 10; \ + ui32 summary_requests = NBucketQuoterTest::Run<TIMER, NBucketQuoterTest::TTestScenario>\ + (THREADS, RPS, seconds); \ + ui32 total_rps = summary_requests / seconds; \ + UNIT_ASSERT_LE(total_rps, UPPER_BOUND); \ +} + +Y_UNIT_TEST_SUITE(TMockTimerTest) { + TEST_RPS_EXACT(1, 100, 100, TMockTimer) + TEST_RPS_EXACT(1, 120, 60, TMockTimer) + TEST_RPS_EXACT(1, 200, 200, TMockTimer) + TEST_RPS_EXACT(1, 220, 110, TMockTimer) + TEST_RPS_EXACT(1, 240, 120, TMockTimer) + TEST_RPS_EXACT(1, 300, 150, TMockTimer) + TEST_RPS_EXACT(1, 320, 320, TMockTimer) + TEST_RPS_EXACT(1, 480, 240, TMockTimer) + +} + +Y_UNIT_TEST_SUITE(TInstantTimerTest) { + TEST_RPS_LESS(1, 360, 300, TInstantTimerMs) + TEST_RPS_LESS(1, 480, 400, TInstantTimerMs) + TEST_RPS_LESS(5, 420, 350, TInstantTimerMs) + TEST_RPS_LESS(5, 220, 185, TInstantTimerMs) +} diff --git a/library/cpp/bucket_quoter/ut/test_namespace.cpp b/library/cpp/bucket_quoter/ut/test_namespace.cpp new file mode 100644 index 0000000000..ee6c84796c --- /dev/null +++ b/library/cpp/bucket_quoter/ut/test_namespace.cpp @@ -0,0 +1,13 @@ +#include "test_namespace.h" + +namespace NBucketQuoterTest { + + TMockTimer::TTime TMockTimer::CurrentTime = 0; + + template <> + void Sleep<TMockTimer>(TDuration duration) { + TMockTimer::Sleep(duration); + } + +} + diff --git a/library/cpp/bucket_quoter/ut/test_namespace.h b/library/cpp/bucket_quoter/ut/test_namespace.h new file mode 100644 index 0000000000..8d8dc1f2b3 --- /dev/null +++ b/library/cpp/bucket_quoter/ut/test_namespace.h @@ -0,0 +1,83 @@ +#pragma once + +#include <library/cpp/bucket_quoter/bucket_quoter.h> +#include <library/cpp/getopt/last_getopt.h> + +#include <util/thread/pool.h> +#include <util/string/printf.h> +#include <util/system/types.h> +#include <util/stream/output.h> +#include <util/datetime/base.h> +#include <util/digest/fnv.h> +#include <vector> +#include <thread> +#include <numeric> + +namespace NBucketQuoterTest { + + // thread unsafe + struct TMockTimer { + using TTime = i64; + static constexpr ui64 Resolution = 1'000'000ull; // microseconds + static TTime CurrentTime; + + static TMockTimer::TTime Now() { + return CurrentTime; + } + + static ui64 Duration(TMockTimer::TTime from, TMockTimer::TTime to) { + return to - from; + } + + static void Sleep(TDuration duration) { + CurrentTime += duration.MicroSeconds(); + } + }; + + template <typename Timer> + void Sleep(TDuration duration) { + ::Sleep(duration); + } + + template <> + void Sleep<TMockTimer>(TDuration duration); + + template <class Timer> + using QuoterTemplate = TBucketQuoter<i64, TMutex, Timer>; + + template<class Timer> + struct TTestScenario { + void operator () (TBucketQuoter<i64, TMutex, Timer>& quoter, ui64 work_time, i64 wait_time, ui64& store_result) const { + typename Timer::TTime start = Timer::Now(); + work_time *= Timer::Resolution; + while (Timer::Duration(start, Timer::Now()) < work_time) { + while(!quoter.IsAvail()) { + NBucketQuoterTest::Sleep<Timer>(TDuration::MicroSeconds(quoter.GetWaitTime())); + } + quoter.Use(1); + ++store_result; + if (wait_time != 0) { + NBucketQuoterTest::Sleep<Timer>(TDuration::MicroSeconds(wait_time)); + } + } + } + }; + + template <class Timer, template <class TT> class Scenario> + ui32 Run(ui64 thread_count, ui64 rps, ui64 seconds, i64 wait_time = 0) { + TBucketQuoter<i64, TMutex, Timer> quoter(rps, rps); + std::vector<std::thread> threads; + std::vector<ui64> results; + threads.reserve(thread_count); + results.reserve(thread_count); + for (ui32 i = 0; i < thread_count; ++i) { + results.emplace_back(0); + threads.emplace_back(Scenario<Timer>{}, std::ref(quoter), seconds, wait_time, std::ref(results.back())); + } + for (ui32 i = 0; i < thread_count; ++i) { + threads[i].join(); + } + return std::reduce(results.begin(), results.end(), 0, std::plus<>()); + } + +}
\ No newline at end of file diff --git a/library/cpp/bucket_quoter/ut/ya.make b/library/cpp/bucket_quoter/ut/ya.make new file mode 100644 index 0000000000..774b85e623 --- /dev/null +++ b/library/cpp/bucket_quoter/ut/ya.make @@ -0,0 +1,18 @@ +UNITTEST() + +OWNER( + g:kikimr + svc +) + +FORK_SUBTESTS() +SRCS( + main.cpp + test_namespace.cpp +) +PEERDIR( + library/cpp/bucket_quoter + library/cpp/getopt +) + +END() diff --git a/library/cpp/bucket_quoter/ya.make b/library/cpp/bucket_quoter/ya.make new file mode 100644 index 0000000000..49c407b502 --- /dev/null +++ b/library/cpp/bucket_quoter/ya.make @@ -0,0 +1,11 @@ +LIBRARY() + +OWNER(serxa) + +SRCS( + bucket_quoter.cpp +) + +END() + +RECURSE_FOR_TESTS(ut) |