aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/bucket_quoter
diff options
context:
space:
mode:
authorserxa <serxa@yandex-team.ru>2022-02-10 16:49:08 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:49:08 +0300
commite5d4696304c6689379ac7ce334512404d4b7836c (patch)
tree5d5cb817648f650d76cf1076100726fd9b8448e8 /library/cpp/bucket_quoter
parentd6d7db348c2cc64e71243cab9940ee6778f4317d (diff)
downloadydb-e5d4696304c6689379ac7ce334512404d4b7836c.tar.gz
Restoring authorship annotation for <serxa@yandex-team.ru>. Commit 2 of 2.
Diffstat (limited to 'library/cpp/bucket_quoter')
-rw-r--r--library/cpp/bucket_quoter/bucket_quoter.h298
1 files changed, 149 insertions, 149 deletions
diff --git a/library/cpp/bucket_quoter/bucket_quoter.h b/library/cpp/bucket_quoter/bucket_quoter.h
index 74eeee2601..3d92ef8450 100644
--- a/library/cpp/bucket_quoter/bucket_quoter.h
+++ b/library/cpp/bucket_quoter/bucket_quoter.h
@@ -2,7 +2,7 @@
#include <util/datetime/base.h>
#include <util/system/mutex.h>
-#include <util/system/hp_timer.h>
+#include <util/system/hp_timer.h>
/* Token bucket.
* Makes flow of *inflow* units per second in average, with up to *capacity* bursts.
@@ -39,54 +39,54 @@
*/
-struct TInstantTimerMs {
- using TTime = TInstant;
- static constexpr ui64 Resolution = 1000ull; // milliseconds
- static TTime Now() {
- return TInstant::Now();
- }
- static ui64 Duration(TTime from, TTime to) {
- return (to - from).MilliSeconds();
- }
-};
-
-struct THPTimerUs {
- using TTime = NHPTimer::STime;
+struct TInstantTimerMs {
+ using TTime = TInstant;
+ static constexpr ui64 Resolution = 1000ull; // milliseconds
+ static TTime Now() {
+ return TInstant::Now();
+ }
+ static ui64 Duration(TTime from, TTime to) {
+ return (to - from).MilliSeconds();
+ }
+};
+
+struct THPTimerUs {
+ using TTime = NHPTimer::STime;
static constexpr ui64 Resolution = 1000000ull; // microseconds
- static TTime Now() {
- NHPTimer::STime ret;
- NHPTimer::GetTime(&ret);
- return ret;
- }
- static ui64 Duration(TTime from, TTime to) {
- i64 cycles = to - from;
- if (cycles > 0) {
- return ui64(double(cycles) * double(Resolution) / NHPTimer::GetClockRate());
- } else {
- return 0;
- }
- }
-};
-
-template <typename StatCounter, typename Lock = TMutex, typename Timer = TInstantTimerMs>
+ static TTime Now() {
+ NHPTimer::STime ret;
+ NHPTimer::GetTime(&ret);
+ return ret;
+ }
+ static ui64 Duration(TTime from, TTime to) {
+ i64 cycles = to - from;
+ if (cycles > 0) {
+ return ui64(double(cycles) * double(Resolution) / NHPTimer::GetClockRate());
+ } else {
+ return 0;
+ }
+ }
+};
+
+template <typename StatCounter, typename Lock = TMutex, typename Timer = TInstantTimerMs>
class TBucketQuoter {
public:
- using TTime = typename Timer::TTime;
-
- struct TResult {
- i64 Before;
- i64 After;
- ui64 Seqno;
- };
-
+ using TTime = typename Timer::TTime;
+
+ struct TResult {
+ i64 Before;
+ i64 After;
+ ui64 Seqno;
+ };
+
/* fixed quota */
- TBucketQuoter(ui64 inflow, ui64 capacity, StatCounter* msgPassed = nullptr,
- StatCounter* bucketUnderflows = nullptr, StatCounter* tokensUsed = nullptr,
+ TBucketQuoter(ui64 inflow, ui64 capacity, StatCounter* msgPassed = nullptr,
+ StatCounter* bucketUnderflows = nullptr, StatCounter* tokensUsed = nullptr,
StatCounter* usecWaited = nullptr, bool fill = false, StatCounter* aggregateInflow = nullptr)
: MsgPassed(msgPassed)
, BucketUnderflows(bucketUnderflows)
, TokensUsed(tokensUsed)
- , UsecWaited(usecWaited)
+ , UsecWaited(usecWaited)
, AggregateInflow(aggregateInflow)
, Bucket(fill ? capacity : 0)
, LastAdd(Timer::Now())
@@ -99,13 +99,13 @@ public:
}
/* adjustable quotas */
- TBucketQuoter(TAtomic* inflow, TAtomic* capacity, StatCounter* msgPassed = nullptr,
- StatCounter* bucketUnderflows = nullptr, StatCounter* tokensUsed = nullptr,
+ TBucketQuoter(TAtomic* inflow, TAtomic* capacity, StatCounter* msgPassed = nullptr,
+ StatCounter* bucketUnderflows = nullptr, StatCounter* tokensUsed = nullptr,
StatCounter* usecWaited = nullptr, bool fill = false, StatCounter* aggregateInflow = nullptr)
: MsgPassed(msgPassed)
, BucketUnderflows(bucketUnderflows)
, TokensUsed(tokensUsed)
- , UsecWaited(usecWaited)
+ , UsecWaited(usecWaited)
, AggregateInflow(aggregateInflow)
, Bucket(fill ? AtomicGet(*capacity) : 0)
, LastAdd(Timer::Now())
@@ -116,22 +116,22 @@ public:
}
bool IsAvail() {
- TGuard<Lock> g(BucketMutex);
- FillBucket();
- if (Bucket < 0) {
- if (BucketUnderflows) {
- (*BucketUnderflows)++;
- }
- }
- return (Bucket >= 0);
- }
-
- bool IsAvail(TResult& res) {
- TGuard<Lock> g(BucketMutex);
- res.Before = Bucket;
+ TGuard<Lock> g(BucketMutex);
+ FillBucket();
+ if (Bucket < 0) {
+ if (BucketUnderflows) {
+ (*BucketUnderflows)++;
+ }
+ }
+ return (Bucket >= 0);
+ }
+
+ bool IsAvail(TResult& res) {
+ TGuard<Lock> g(BucketMutex);
+ res.Before = Bucket;
FillBucket();
- res.After = Bucket;
- res.Seqno = ++Seqno;
+ res.After = Bucket;
+ res.Seqno = ++Seqno;
if (Bucket < 0) {
if (BucketUnderflows) {
(*BucketUnderflows)++;
@@ -140,58 +140,58 @@ public:
return (Bucket >= 0);
}
- ui64 GetAvail() {
- TGuard<Lock> g(BucketMutex);
- FillBucket();
- return Max<i64>(0, Bucket);
- }
-
- ui64 GetAvail(TResult& res) {
- TGuard<Lock> g(BucketMutex);
- res.Before = Bucket;
- FillBucket();
- res.After = Bucket;
- res.Seqno = ++Seqno;
- return Max<i64>(0, Bucket);
- }
-
+ ui64 GetAvail() {
+ TGuard<Lock> g(BucketMutex);
+ FillBucket();
+ return Max<i64>(0, Bucket);
+ }
+
+ ui64 GetAvail(TResult& res) {
+ TGuard<Lock> g(BucketMutex);
+ res.Before = Bucket;
+ FillBucket();
+ res.After = Bucket;
+ res.Seqno = ++Seqno;
+ return Max<i64>(0, Bucket);
+ }
+
void Use(ui64 tokens, bool sleep = false) {
- TGuard<Lock> g(BucketMutex);
- UseNoLock(tokens, sleep);
+ TGuard<Lock> g(BucketMutex);
+ UseNoLock(tokens, sleep);
+ }
+
+ void Use(ui64 tokens, TResult& res, bool sleep = false) {
+ TGuard<Lock> g(BucketMutex);
+ res.Before = Bucket;
+ UseNoLock(tokens, sleep);
+ res.After = Bucket;
+ res.Seqno = ++Seqno;
}
- void Use(ui64 tokens, TResult& res, bool sleep = false) {
- TGuard<Lock> g(BucketMutex);
- res.Before = Bucket;
- UseNoLock(tokens, sleep);
- res.After = Bucket;
- res.Seqno = ++Seqno;
- }
-
- i64 UseAndFill(ui64 tokens) {
- TGuard<Lock> g(BucketMutex);
- UseNoLock(tokens);
+ i64 UseAndFill(ui64 tokens) {
+ TGuard<Lock> g(BucketMutex);
+ UseNoLock(tokens);
+ FillBucket();
+ return Bucket;
+ }
+
+ void Add(ui64 tokens) {
+ TGuard<Lock> g(BucketMutex);
+ AddNoLock(tokens);
+ }
+
+ void Add(ui64 tokens, TResult& res) {
+ TGuard<Lock> g(BucketMutex);
+ res.Before = Bucket;
+ AddNoLock(tokens);
+ res.After = Bucket;
+ res.Seqno = ++Seqno;
+ }
+
+ ui32 GetWaitTime() {
+ TGuard<Lock> g(BucketMutex);
+
FillBucket();
- return Bucket;
- }
-
- void Add(ui64 tokens) {
- TGuard<Lock> g(BucketMutex);
- AddNoLock(tokens);
- }
-
- void Add(ui64 tokens, TResult& res) {
- TGuard<Lock> g(BucketMutex);
- res.Before = Bucket;
- AddNoLock(tokens);
- res.After = Bucket;
- res.Seqno = ++Seqno;
- }
-
- ui32 GetWaitTime() {
- TGuard<Lock> g(BucketMutex);
-
- FillBucket();
if (Bucket >= 0) {
return 0;
}
@@ -200,37 +200,37 @@ public:
return usec;
}
- ui32 GetWaitTime(TResult& res) {
- TGuard<Lock> g(BucketMutex);
- res.Before = Bucket;
- FillBucket();
- res.After = Bucket;
- res.Seqno = ++Seqno;
- if (Bucket >= 0) {
- return 0;
- }
- ui32 usec = (-Bucket * 1000000) / (*InflowTokensPerSecond);
- return usec;
- }
-
+ ui32 GetWaitTime(TResult& res) {
+ TGuard<Lock> g(BucketMutex);
+ res.Before = Bucket;
+ FillBucket();
+ res.After = Bucket;
+ res.Seqno = ++Seqno;
+ if (Bucket >= 0) {
+ return 0;
+ }
+ ui32 usec = (-Bucket * 1000000) / (*InflowTokensPerSecond);
+ return usec;
+ }
+
void Sleep() {
while (!IsAvail()) {
ui32 delay = GetWaitTime();
if (delay != 0) {
usleep(delay);
- if (UsecWaited) {
- (*UsecWaited) += delay;
- }
+ if (UsecWaited) {
+ (*UsecWaited) += delay;
+ }
}
}
}
private:
void FillBucket() {
- TTime now = Timer::Now();
+ TTime now = Timer::Now();
- ui64 elapsed = Timer::Duration(LastAdd, now);
- if (*InflowTokensPerSecond * elapsed >= Timer::Resolution) {
+ ui64 elapsed = Timer::Duration(LastAdd, now);
+ if (*InflowTokensPerSecond * elapsed >= Timer::Resolution) {
ui64 inflow = *InflowTokensPerSecond * elapsed / Timer::Resolution;
if (AggregateInflow) {
*AggregateInflow += inflow;
@@ -244,35 +244,35 @@ private:
}
}
- void UseNoLock(ui64 tokens, bool sleep = false) {
- if (sleep)
- Sleep();
- Bucket -= tokens;
- if (TokensUsed) {
- (*TokensUsed) += tokens;
- }
- if (MsgPassed) {
- (*MsgPassed)++;
- }
- }
-
- void AddNoLock(ui64 tokens) {
- Bucket += tokens;
- if (Bucket > *BucketTokensCapacity) {
- Bucket = *BucketTokensCapacity;
- }
- }
-
+ void UseNoLock(ui64 tokens, bool sleep = false) {
+ if (sleep)
+ Sleep();
+ Bucket -= tokens;
+ if (TokensUsed) {
+ (*TokensUsed) += tokens;
+ }
+ if (MsgPassed) {
+ (*MsgPassed)++;
+ }
+ }
+
+ void AddNoLock(ui64 tokens) {
+ Bucket += tokens;
+ if (Bucket > *BucketTokensCapacity) {
+ Bucket = *BucketTokensCapacity;
+ }
+ }
+
StatCounter* MsgPassed;
StatCounter* BucketUnderflows;
StatCounter* TokensUsed;
- StatCounter* UsecWaited;
+ StatCounter* UsecWaited;
StatCounter* AggregateInflow;
i64 Bucket;
- TTime LastAdd;
- Lock BucketMutex;
- ui64 Seqno = 0;
+ TTime LastAdd;
+ Lock BucketMutex;
+ ui64 Seqno = 0;
TAtomic* InflowTokensPerSecond;
TAtomic* BucketTokensCapacity;