aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/bucket_quoter
diff options
context:
space:
mode:
authorDevtools Arcadia <arcadia-devtools@yandex-team.ru>2022-02-07 18:08:42 +0300
committerDevtools Arcadia <arcadia-devtools@mous.vla.yp-c.yandex.net>2022-02-07 18:08:42 +0300
commit1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch)
treee26c9fed0de5d9873cce7e00bc214573dc2195b7 /library/cpp/bucket_quoter
downloadydb-1110808a9d39d4b808aef724c861a2e1a38d2a69.tar.gz
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'library/cpp/bucket_quoter')
-rw-r--r--library/cpp/bucket_quoter/bucket_quoter.cpp1
-rw-r--r--library/cpp/bucket_quoter/bucket_quoter.h281
-rw-r--r--library/cpp/bucket_quoter/ut/README.md20
-rw-r--r--library/cpp/bucket_quoter/ut/main.cpp44
-rw-r--r--library/cpp/bucket_quoter/ut/test_namespace.cpp13
-rw-r--r--library/cpp/bucket_quoter/ut/test_namespace.h83
-rw-r--r--library/cpp/bucket_quoter/ut/ya.make18
-rw-r--r--library/cpp/bucket_quoter/ya.make11
8 files changed, 471 insertions, 0 deletions
diff --git a/library/cpp/bucket_quoter/bucket_quoter.cpp b/library/cpp/bucket_quoter/bucket_quoter.cpp
new file mode 100644
index 0000000000..e7f8b1a869
--- /dev/null
+++ b/library/cpp/bucket_quoter/bucket_quoter.cpp
@@ -0,0 +1 @@
+#include "bucket_quoter.h"
diff --git a/library/cpp/bucket_quoter/bucket_quoter.h b/library/cpp/bucket_quoter/bucket_quoter.h
new file mode 100644
index 0000000000..3d92ef8450
--- /dev/null
+++ b/library/cpp/bucket_quoter/bucket_quoter.h
@@ -0,0 +1,281 @@
+#pragma once
+
+#include <util/datetime/base.h>
+#include <util/system/mutex.h>
+#include <util/system/hp_timer.h>
+
+/* Token bucket.
+ * Makes flow of *inflow* units per second in average, with up to *capacity* bursts.
+ * Do not use for STRICT flow control.
+ */
+
+/* samples: create and use quoter sending 1000 bytes per second on average,
+ with up to 60 seconds quota buildup.
+
+ TBucketQuoter quoter(1000, 60000, NULL, NULL, NULL);
+
+ for (;;) {
+ T *msg = get_message();
+
+ quoter.Sleep();
+ quoter.Use(msg->GetSize());
+ send_message(msg);
+ }
+
+ ----------------------------
+
+ TBucketQuoter quoter(1000, 60000, NULL, NULL, NULL);
+
+ for (;;) {
+ T *msg = get_message();
+
+ while (! quoter.IsAvail()) {
+ // do something else
+ }
+
+ quoter.Use(msg->GetSize());
+ send_message(msg);
+ }
+
+*/
+
+struct TInstantTimerMs {
+ using TTime = TInstant;
+ static constexpr ui64 Resolution = 1000ull; // milliseconds
+ static TTime Now() {
+ return TInstant::Now();
+ }
+ static ui64 Duration(TTime from, TTime to) {
+ return (to - from).MilliSeconds();
+ }
+};
+
+struct THPTimerUs {
+ using TTime = NHPTimer::STime;
+ static constexpr ui64 Resolution = 1000000ull; // microseconds
+ static TTime Now() {
+ NHPTimer::STime ret;
+ NHPTimer::GetTime(&ret);
+ return ret;
+ }
+ static ui64 Duration(TTime from, TTime to) {
+ i64 cycles = to - from;
+ if (cycles > 0) {
+ return ui64(double(cycles) * double(Resolution) / NHPTimer::GetClockRate());
+ } else {
+ return 0;
+ }
+ }
+};
+
+template <typename StatCounter, typename Lock = TMutex, typename Timer = TInstantTimerMs>
+class TBucketQuoter {
+public:
+ using TTime = typename Timer::TTime;
+
+ struct TResult {
+ i64 Before;
+ i64 After;
+ ui64 Seqno;
+ };
+
+ /* fixed quota */
+ TBucketQuoter(ui64 inflow, ui64 capacity, StatCounter* msgPassed = nullptr,
+ StatCounter* bucketUnderflows = nullptr, StatCounter* tokensUsed = nullptr,
+ StatCounter* usecWaited = nullptr, bool fill = false, StatCounter* aggregateInflow = nullptr)
+ : MsgPassed(msgPassed)
+ , BucketUnderflows(bucketUnderflows)
+ , TokensUsed(tokensUsed)
+ , UsecWaited(usecWaited)
+ , AggregateInflow(aggregateInflow)
+ , Bucket(fill ? capacity : 0)
+ , LastAdd(Timer::Now())
+ , InflowTokensPerSecond(&FixedInflow)
+ , BucketTokensCapacity(&FixedCapacity)
+ , FixedInflow(inflow)
+ , FixedCapacity(capacity)
+ {
+ /* no-op */
+ }
+
+ /* adjustable quotas */
+ TBucketQuoter(TAtomic* inflow, TAtomic* capacity, StatCounter* msgPassed = nullptr,
+ StatCounter* bucketUnderflows = nullptr, StatCounter* tokensUsed = nullptr,
+ StatCounter* usecWaited = nullptr, bool fill = false, StatCounter* aggregateInflow = nullptr)
+ : MsgPassed(msgPassed)
+ , BucketUnderflows(bucketUnderflows)
+ , TokensUsed(tokensUsed)
+ , UsecWaited(usecWaited)
+ , AggregateInflow(aggregateInflow)
+ , Bucket(fill ? AtomicGet(*capacity) : 0)
+ , LastAdd(Timer::Now())
+ , InflowTokensPerSecond(inflow)
+ , BucketTokensCapacity(capacity)
+ {
+ /* no-op */
+ }
+
+ bool IsAvail() {
+ TGuard<Lock> g(BucketMutex);
+ FillBucket();
+ if (Bucket < 0) {
+ if (BucketUnderflows) {
+ (*BucketUnderflows)++;
+ }
+ }
+ return (Bucket >= 0);
+ }
+
+ bool IsAvail(TResult& res) {
+ TGuard<Lock> g(BucketMutex);
+ res.Before = Bucket;
+ FillBucket();
+ res.After = Bucket;
+ res.Seqno = ++Seqno;
+ if (Bucket < 0) {
+ if (BucketUnderflows) {
+ (*BucketUnderflows)++;
+ }
+ }
+ return (Bucket >= 0);
+ }
+
+ ui64 GetAvail() {
+ TGuard<Lock> g(BucketMutex);
+ FillBucket();
+ return Max<i64>(0, Bucket);
+ }
+
+ ui64 GetAvail(TResult& res) {
+ TGuard<Lock> g(BucketMutex);
+ res.Before = Bucket;
+ FillBucket();
+ res.After = Bucket;
+ res.Seqno = ++Seqno;
+ return Max<i64>(0, Bucket);
+ }
+
+ void Use(ui64 tokens, bool sleep = false) {
+ TGuard<Lock> g(BucketMutex);
+ UseNoLock(tokens, sleep);
+ }
+
+ void Use(ui64 tokens, TResult& res, bool sleep = false) {
+ TGuard<Lock> g(BucketMutex);
+ res.Before = Bucket;
+ UseNoLock(tokens, sleep);
+ res.After = Bucket;
+ res.Seqno = ++Seqno;
+ }
+
+ i64 UseAndFill(ui64 tokens) {
+ TGuard<Lock> g(BucketMutex);
+ UseNoLock(tokens);
+ FillBucket();
+ return Bucket;
+ }
+
+ void Add(ui64 tokens) {
+ TGuard<Lock> g(BucketMutex);
+ AddNoLock(tokens);
+ }
+
+ void Add(ui64 tokens, TResult& res) {
+ TGuard<Lock> g(BucketMutex);
+ res.Before = Bucket;
+ AddNoLock(tokens);
+ res.After = Bucket;
+ res.Seqno = ++Seqno;
+ }
+
+ ui32 GetWaitTime() {
+ TGuard<Lock> g(BucketMutex);
+
+ FillBucket();
+ if (Bucket >= 0) {
+ return 0;
+ }
+
+ ui32 usec = (-Bucket * 1000000) / (*InflowTokensPerSecond);
+ return usec;
+ }
+
+ ui32 GetWaitTime(TResult& res) {
+ TGuard<Lock> g(BucketMutex);
+ res.Before = Bucket;
+ FillBucket();
+ res.After = Bucket;
+ res.Seqno = ++Seqno;
+ if (Bucket >= 0) {
+ return 0;
+ }
+ ui32 usec = (-Bucket * 1000000) / (*InflowTokensPerSecond);
+ return usec;
+ }
+
+ void Sleep() {
+ while (!IsAvail()) {
+ ui32 delay = GetWaitTime();
+ if (delay != 0) {
+ usleep(delay);
+ if (UsecWaited) {
+ (*UsecWaited) += delay;
+ }
+ }
+ }
+ }
+
+private:
+ void FillBucket() {
+ TTime now = Timer::Now();
+
+ ui64 elapsed = Timer::Duration(LastAdd, now);
+ if (*InflowTokensPerSecond * elapsed >= Timer::Resolution) {
+ ui64 inflow = *InflowTokensPerSecond * elapsed / Timer::Resolution;
+ if (AggregateInflow) {
+ *AggregateInflow += inflow;
+ }
+ Bucket += inflow;
+ if (Bucket > *BucketTokensCapacity) {
+ Bucket = *BucketTokensCapacity;
+ }
+
+ LastAdd = now;
+ }
+ }
+
+ void UseNoLock(ui64 tokens, bool sleep = false) {
+ if (sleep)
+ Sleep();
+ Bucket -= tokens;
+ if (TokensUsed) {
+ (*TokensUsed) += tokens;
+ }
+ if (MsgPassed) {
+ (*MsgPassed)++;
+ }
+ }
+
+ void AddNoLock(ui64 tokens) {
+ Bucket += tokens;
+ if (Bucket > *BucketTokensCapacity) {
+ Bucket = *BucketTokensCapacity;
+ }
+ }
+
+ StatCounter* MsgPassed;
+ StatCounter* BucketUnderflows;
+ StatCounter* TokensUsed;
+ StatCounter* UsecWaited;
+ StatCounter* AggregateInflow;
+
+ i64 Bucket;
+ TTime LastAdd;
+ Lock BucketMutex;
+ ui64 Seqno = 0;
+
+ TAtomic* InflowTokensPerSecond;
+ TAtomic* BucketTokensCapacity;
+ TAtomic FixedInflow;
+ TAtomic FixedCapacity;
+};
diff --git a/library/cpp/bucket_quoter/ut/README.md b/library/cpp/bucket_quoter/ut/README.md
new file mode 100644
index 0000000000..a0db9f6239
--- /dev/null
+++ b/library/cpp/bucket_quoter/ut/README.md
@@ -0,0 +1,20 @@
+# Unit test для bucket_quoter
+
+Этот тест предназначен не для проверки корректности bucket_quoter, а для того,
+чтобы показать некоторые его недостатки
+
+Первый недостаток: При вызове метода FillBucket, вычисляется количество токенов,
+которое можно добавить в Bucket за время, прошедшее с последнего добавления.
+Если это количество нецелое, то оно округляется вниз. Это может иметь значение для малых RPS.
+
+Второй недостаток:
+При попытке получить время через GetWaitTime, возвращается не ближайшее время, когда станут доступны новые токены.
+Возвращаемое время кратно (1 / RPS), выраженному в микросекундах.
+Кроме того, из-за округления, метод может вернуть время, в котором новые токены все еще не смогут быть начислены.
+
+Написанный тест демонстрирует, что данные недостатки могут приводить значительному снижению
+количества пропускаемых лимитером запросов(вплоть до двух раз в специальных условиях с искусственным таймером).
+
+Так же демонстрируется, что при использовании лимитера со стандартным таймером(TInstantTimerMs), RPS тоже может
+достаточно далек от заданного.
+
diff --git a/library/cpp/bucket_quoter/ut/main.cpp b/library/cpp/bucket_quoter/ut/main.cpp
new file mode 100644
index 0000000000..9c86bace4e
--- /dev/null
+++ b/library/cpp/bucket_quoter/ut/main.cpp
@@ -0,0 +1,44 @@
+#include <library/cpp/testing/unittest/registar.h>
+#include <library/cpp/bucket_quoter/bucket_quoter.h>
+
+#include "test_namespace.h"
+
+
+using NBucketQuoterTest::TMockTimer;
+
+#define TEST_RPS_EXACT(THREADS, RPS, EXPECTED_RESULT, TIMER) \
+Y_UNIT_TEST(Test##TIMER##RPS##Rps##THREADS##Threads) { \
+ ui32 seconds = 10; \
+ ui32 summary_requests = NBucketQuoterTest::Run<TIMER, NBucketQuoterTest::TTestScenario>\
+ (THREADS, RPS, seconds);\
+ ui32 total_rps = summary_requests / seconds; \
+ UNIT_ASSERT_EQUAL(total_rps, EXPECTED_RESULT);\
+}
+
+#define TEST_RPS_LESS(THREADS, RPS, UPPER_BOUND, TIMER) \
+Y_UNIT_TEST(Test##TIMER##RPS##rps##THREADS##threads) { \
+ ui32 seconds = 10; \
+ ui32 summary_requests = NBucketQuoterTest::Run<TIMER, NBucketQuoterTest::TTestScenario>\
+ (THREADS, RPS, seconds); \
+ ui32 total_rps = summary_requests / seconds; \
+ UNIT_ASSERT_LE(total_rps, UPPER_BOUND); \
+}
+
+Y_UNIT_TEST_SUITE(TMockTimerTest) {
+ TEST_RPS_EXACT(1, 100, 100, TMockTimer)
+ TEST_RPS_EXACT(1, 120, 60, TMockTimer)
+ TEST_RPS_EXACT(1, 200, 200, TMockTimer)
+ TEST_RPS_EXACT(1, 220, 110, TMockTimer)
+ TEST_RPS_EXACT(1, 240, 120, TMockTimer)
+ TEST_RPS_EXACT(1, 300, 150, TMockTimer)
+ TEST_RPS_EXACT(1, 320, 320, TMockTimer)
+ TEST_RPS_EXACT(1, 480, 240, TMockTimer)
+
+}
+
+Y_UNIT_TEST_SUITE(TInstantTimerTest) {
+ TEST_RPS_LESS(1, 360, 300, TInstantTimerMs)
+ TEST_RPS_LESS(1, 480, 400, TInstantTimerMs)
+ TEST_RPS_LESS(5, 420, 350, TInstantTimerMs)
+ TEST_RPS_LESS(5, 220, 185, TInstantTimerMs)
+}
diff --git a/library/cpp/bucket_quoter/ut/test_namespace.cpp b/library/cpp/bucket_quoter/ut/test_namespace.cpp
new file mode 100644
index 0000000000..ee6c84796c
--- /dev/null
+++ b/library/cpp/bucket_quoter/ut/test_namespace.cpp
@@ -0,0 +1,13 @@
+#include "test_namespace.h"
+
+namespace NBucketQuoterTest {
+
+ TMockTimer::TTime TMockTimer::CurrentTime = 0;
+
+ template <>
+ void Sleep<TMockTimer>(TDuration duration) {
+ TMockTimer::Sleep(duration);
+ }
+
+}
+
diff --git a/library/cpp/bucket_quoter/ut/test_namespace.h b/library/cpp/bucket_quoter/ut/test_namespace.h
new file mode 100644
index 0000000000..8d8dc1f2b3
--- /dev/null
+++ b/library/cpp/bucket_quoter/ut/test_namespace.h
@@ -0,0 +1,83 @@
+#pragma once
+
+#include <library/cpp/bucket_quoter/bucket_quoter.h>
+#include <library/cpp/getopt/last_getopt.h>
+
+#include <util/thread/pool.h>
+#include <util/string/printf.h>
+#include <util/system/types.h>
+#include <util/stream/output.h>
+#include <util/datetime/base.h>
+#include <util/digest/fnv.h>
+#include <vector>
+#include <thread>
+#include <numeric>
+
+namespace NBucketQuoterTest {
+
+ // thread unsafe
+ struct TMockTimer {
+ using TTime = i64;
+ static constexpr ui64 Resolution = 1'000'000ull; // microseconds
+ static TTime CurrentTime;
+
+ static TMockTimer::TTime Now() {
+ return CurrentTime;
+ }
+
+ static ui64 Duration(TMockTimer::TTime from, TMockTimer::TTime to) {
+ return to - from;
+ }
+
+ static void Sleep(TDuration duration) {
+ CurrentTime += duration.MicroSeconds();
+ }
+ };
+
+ template <typename Timer>
+ void Sleep(TDuration duration) {
+ ::Sleep(duration);
+ }
+
+ template <>
+ void Sleep<TMockTimer>(TDuration duration);
+
+ template <class Timer>
+ using QuoterTemplate = TBucketQuoter<i64, TMutex, Timer>;
+
+ template<class Timer>
+ struct TTestScenario {
+ void operator () (TBucketQuoter<i64, TMutex, Timer>& quoter, ui64 work_time, i64 wait_time, ui64& store_result) const {
+ typename Timer::TTime start = Timer::Now();
+ work_time *= Timer::Resolution;
+ while (Timer::Duration(start, Timer::Now()) < work_time) {
+ while(!quoter.IsAvail()) {
+ NBucketQuoterTest::Sleep<Timer>(TDuration::MicroSeconds(quoter.GetWaitTime()));
+ }
+ quoter.Use(1);
+ ++store_result;
+ if (wait_time != 0) {
+ NBucketQuoterTest::Sleep<Timer>(TDuration::MicroSeconds(wait_time));
+ }
+ }
+ }
+ };
+
+ template <class Timer, template <class TT> class Scenario>
+ ui32 Run(ui64 thread_count, ui64 rps, ui64 seconds, i64 wait_time = 0) {
+ TBucketQuoter<i64, TMutex, Timer> quoter(rps, rps);
+ std::vector<std::thread> threads;
+ std::vector<ui64> results;
+ threads.reserve(thread_count);
+ results.reserve(thread_count);
+ for (ui32 i = 0; i < thread_count; ++i) {
+ results.emplace_back(0);
+ threads.emplace_back(Scenario<Timer>{}, std::ref(quoter), seconds, wait_time, std::ref(results.back()));
+ }
+ for (ui32 i = 0; i < thread_count; ++i) {
+ threads[i].join();
+ }
+ return std::reduce(results.begin(), results.end(), 0, std::plus<>());
+ }
+
+} \ No newline at end of file
diff --git a/library/cpp/bucket_quoter/ut/ya.make b/library/cpp/bucket_quoter/ut/ya.make
new file mode 100644
index 0000000000..774b85e623
--- /dev/null
+++ b/library/cpp/bucket_quoter/ut/ya.make
@@ -0,0 +1,18 @@
+UNITTEST()
+
+OWNER(
+ g:kikimr
+ svc
+)
+
+FORK_SUBTESTS()
+SRCS(
+ main.cpp
+ test_namespace.cpp
+)
+PEERDIR(
+ library/cpp/bucket_quoter
+ library/cpp/getopt
+)
+
+END()
diff --git a/library/cpp/bucket_quoter/ya.make b/library/cpp/bucket_quoter/ya.make
new file mode 100644
index 0000000000..49c407b502
--- /dev/null
+++ b/library/cpp/bucket_quoter/ya.make
@@ -0,0 +1,11 @@
+LIBRARY()
+
+OWNER(serxa)
+
+SRCS(
+ bucket_quoter.cpp
+)
+
+END()
+
+RECURSE_FOR_TESTS(ut)