diff options
author | serxa <serxa@yandex-team.ru> | 2022-02-10 16:49:08 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:49:08 +0300 |
commit | d6d7db348c2cc64e71243cab9940ee6778f4317d (patch) | |
tree | bac67f42a02f9368eb4d329f5d79b77d0a6adc18 /library/cpp/bucket_quoter | |
parent | 8d57b69dee81198a59c39e64704f7dc9f04b4fbf (diff) | |
download | ydb-d6d7db348c2cc64e71243cab9940ee6778f4317d.tar.gz |
Restoring authorship annotation for <serxa@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/bucket_quoter')
-rw-r--r-- | library/cpp/bucket_quoter/bucket_quoter.h | 298 |
1 files changed, 149 insertions, 149 deletions
diff --git a/library/cpp/bucket_quoter/bucket_quoter.h b/library/cpp/bucket_quoter/bucket_quoter.h index 3d92ef8450..74eeee2601 100644 --- a/library/cpp/bucket_quoter/bucket_quoter.h +++ b/library/cpp/bucket_quoter/bucket_quoter.h @@ -2,7 +2,7 @@ #include <util/datetime/base.h> #include <util/system/mutex.h> -#include <util/system/hp_timer.h> +#include <util/system/hp_timer.h> /* Token bucket. * Makes flow of *inflow* units per second in average, with up to *capacity* bursts. @@ -39,54 +39,54 @@ */ -struct TInstantTimerMs { - using TTime = TInstant; - static constexpr ui64 Resolution = 1000ull; // milliseconds - static TTime Now() { - return TInstant::Now(); - } - static ui64 Duration(TTime from, TTime to) { - return (to - from).MilliSeconds(); - } -}; - -struct THPTimerUs { - using TTime = NHPTimer::STime; +struct TInstantTimerMs { + using TTime = TInstant; + static constexpr ui64 Resolution = 1000ull; // milliseconds + static TTime Now() { + return TInstant::Now(); + } + static ui64 Duration(TTime from, TTime to) { + return (to - from).MilliSeconds(); + } +}; + +struct THPTimerUs { + using TTime = NHPTimer::STime; static constexpr ui64 Resolution = 1000000ull; // microseconds - static TTime Now() { - NHPTimer::STime ret; - NHPTimer::GetTime(&ret); - return ret; - } - static ui64 Duration(TTime from, TTime to) { - i64 cycles = to - from; - if (cycles > 0) { - return ui64(double(cycles) * double(Resolution) / NHPTimer::GetClockRate()); - } else { - return 0; - } - } -}; - -template <typename StatCounter, typename Lock = TMutex, typename Timer = TInstantTimerMs> + static TTime Now() { + NHPTimer::STime ret; + NHPTimer::GetTime(&ret); + return ret; + } + static ui64 Duration(TTime from, TTime to) { + i64 cycles = to - from; + if (cycles > 0) { + return ui64(double(cycles) * double(Resolution) / NHPTimer::GetClockRate()); + } else { + return 0; + } + } +}; + +template <typename StatCounter, typename Lock = TMutex, typename Timer = TInstantTimerMs> class TBucketQuoter { public: - using TTime = typename Timer::TTime; - - struct TResult { - i64 Before; - i64 After; - ui64 Seqno; - }; - + using TTime = typename Timer::TTime; + + struct TResult { + i64 Before; + i64 After; + ui64 Seqno; + }; + /* fixed quota */ - TBucketQuoter(ui64 inflow, ui64 capacity, StatCounter* msgPassed = nullptr, - StatCounter* bucketUnderflows = nullptr, StatCounter* tokensUsed = nullptr, + TBucketQuoter(ui64 inflow, ui64 capacity, StatCounter* msgPassed = nullptr, + StatCounter* bucketUnderflows = nullptr, StatCounter* tokensUsed = nullptr, StatCounter* usecWaited = nullptr, bool fill = false, StatCounter* aggregateInflow = nullptr) : MsgPassed(msgPassed) , BucketUnderflows(bucketUnderflows) , TokensUsed(tokensUsed) - , UsecWaited(usecWaited) + , UsecWaited(usecWaited) , AggregateInflow(aggregateInflow) , Bucket(fill ? capacity : 0) , LastAdd(Timer::Now()) @@ -99,13 +99,13 @@ public: } /* adjustable quotas */ - TBucketQuoter(TAtomic* inflow, TAtomic* capacity, StatCounter* msgPassed = nullptr, - StatCounter* bucketUnderflows = nullptr, StatCounter* tokensUsed = nullptr, + TBucketQuoter(TAtomic* inflow, TAtomic* capacity, StatCounter* msgPassed = nullptr, + StatCounter* bucketUnderflows = nullptr, StatCounter* tokensUsed = nullptr, StatCounter* usecWaited = nullptr, bool fill = false, StatCounter* aggregateInflow = nullptr) : MsgPassed(msgPassed) , BucketUnderflows(bucketUnderflows) , TokensUsed(tokensUsed) - , UsecWaited(usecWaited) + , UsecWaited(usecWaited) , AggregateInflow(aggregateInflow) , Bucket(fill ? AtomicGet(*capacity) : 0) , LastAdd(Timer::Now()) @@ -116,22 +116,22 @@ public: } bool IsAvail() { - TGuard<Lock> g(BucketMutex); - FillBucket(); - if (Bucket < 0) { - if (BucketUnderflows) { - (*BucketUnderflows)++; - } - } - return (Bucket >= 0); - } - - bool IsAvail(TResult& res) { - TGuard<Lock> g(BucketMutex); - res.Before = Bucket; + TGuard<Lock> g(BucketMutex); + FillBucket(); + if (Bucket < 0) { + if (BucketUnderflows) { + (*BucketUnderflows)++; + } + } + return (Bucket >= 0); + } + + bool IsAvail(TResult& res) { + TGuard<Lock> g(BucketMutex); + res.Before = Bucket; FillBucket(); - res.After = Bucket; - res.Seqno = ++Seqno; + res.After = Bucket; + res.Seqno = ++Seqno; if (Bucket < 0) { if (BucketUnderflows) { (*BucketUnderflows)++; @@ -140,58 +140,58 @@ public: return (Bucket >= 0); } - ui64 GetAvail() { - TGuard<Lock> g(BucketMutex); - FillBucket(); - return Max<i64>(0, Bucket); - } - - ui64 GetAvail(TResult& res) { - TGuard<Lock> g(BucketMutex); - res.Before = Bucket; - FillBucket(); - res.After = Bucket; - res.Seqno = ++Seqno; - return Max<i64>(0, Bucket); - } - + ui64 GetAvail() { + TGuard<Lock> g(BucketMutex); + FillBucket(); + return Max<i64>(0, Bucket); + } + + ui64 GetAvail(TResult& res) { + TGuard<Lock> g(BucketMutex); + res.Before = Bucket; + FillBucket(); + res.After = Bucket; + res.Seqno = ++Seqno; + return Max<i64>(0, Bucket); + } + void Use(ui64 tokens, bool sleep = false) { - TGuard<Lock> g(BucketMutex); - UseNoLock(tokens, sleep); - } - - void Use(ui64 tokens, TResult& res, bool sleep = false) { - TGuard<Lock> g(BucketMutex); - res.Before = Bucket; - UseNoLock(tokens, sleep); - res.After = Bucket; - res.Seqno = ++Seqno; + TGuard<Lock> g(BucketMutex); + UseNoLock(tokens, sleep); } - i64 UseAndFill(ui64 tokens) { - TGuard<Lock> g(BucketMutex); - UseNoLock(tokens); - FillBucket(); - return Bucket; - } - - void Add(ui64 tokens) { - TGuard<Lock> g(BucketMutex); - AddNoLock(tokens); - } - - void Add(ui64 tokens, TResult& res) { - TGuard<Lock> g(BucketMutex); - res.Before = Bucket; - AddNoLock(tokens); - res.After = Bucket; - res.Seqno = ++Seqno; - } - - ui32 GetWaitTime() { - TGuard<Lock> g(BucketMutex); - + void Use(ui64 tokens, TResult& res, bool sleep = false) { + TGuard<Lock> g(BucketMutex); + res.Before = Bucket; + UseNoLock(tokens, sleep); + res.After = Bucket; + res.Seqno = ++Seqno; + } + + i64 UseAndFill(ui64 tokens) { + TGuard<Lock> g(BucketMutex); + UseNoLock(tokens); FillBucket(); + return Bucket; + } + + void Add(ui64 tokens) { + TGuard<Lock> g(BucketMutex); + AddNoLock(tokens); + } + + void Add(ui64 tokens, TResult& res) { + TGuard<Lock> g(BucketMutex); + res.Before = Bucket; + AddNoLock(tokens); + res.After = Bucket; + res.Seqno = ++Seqno; + } + + ui32 GetWaitTime() { + TGuard<Lock> g(BucketMutex); + + FillBucket(); if (Bucket >= 0) { return 0; } @@ -200,37 +200,37 @@ public: return usec; } - ui32 GetWaitTime(TResult& res) { - TGuard<Lock> g(BucketMutex); - res.Before = Bucket; - FillBucket(); - res.After = Bucket; - res.Seqno = ++Seqno; - if (Bucket >= 0) { - return 0; - } - ui32 usec = (-Bucket * 1000000) / (*InflowTokensPerSecond); - return usec; - } - + ui32 GetWaitTime(TResult& res) { + TGuard<Lock> g(BucketMutex); + res.Before = Bucket; + FillBucket(); + res.After = Bucket; + res.Seqno = ++Seqno; + if (Bucket >= 0) { + return 0; + } + ui32 usec = (-Bucket * 1000000) / (*InflowTokensPerSecond); + return usec; + } + void Sleep() { while (!IsAvail()) { ui32 delay = GetWaitTime(); if (delay != 0) { usleep(delay); - if (UsecWaited) { - (*UsecWaited) += delay; - } + if (UsecWaited) { + (*UsecWaited) += delay; + } } } } private: void FillBucket() { - TTime now = Timer::Now(); + TTime now = Timer::Now(); - ui64 elapsed = Timer::Duration(LastAdd, now); - if (*InflowTokensPerSecond * elapsed >= Timer::Resolution) { + ui64 elapsed = Timer::Duration(LastAdd, now); + if (*InflowTokensPerSecond * elapsed >= Timer::Resolution) { ui64 inflow = *InflowTokensPerSecond * elapsed / Timer::Resolution; if (AggregateInflow) { *AggregateInflow += inflow; @@ -244,35 +244,35 @@ private: } } - void UseNoLock(ui64 tokens, bool sleep = false) { - if (sleep) - Sleep(); - Bucket -= tokens; - if (TokensUsed) { - (*TokensUsed) += tokens; - } - if (MsgPassed) { - (*MsgPassed)++; - } - } - - void AddNoLock(ui64 tokens) { - Bucket += tokens; - if (Bucket > *BucketTokensCapacity) { - Bucket = *BucketTokensCapacity; - } - } - + void UseNoLock(ui64 tokens, bool sleep = false) { + if (sleep) + Sleep(); + Bucket -= tokens; + if (TokensUsed) { + (*TokensUsed) += tokens; + } + if (MsgPassed) { + (*MsgPassed)++; + } + } + + void AddNoLock(ui64 tokens) { + Bucket += tokens; + if (Bucket > *BucketTokensCapacity) { + Bucket = *BucketTokensCapacity; + } + } + StatCounter* MsgPassed; StatCounter* BucketUnderflows; StatCounter* TokensUsed; - StatCounter* UsecWaited; + StatCounter* UsecWaited; StatCounter* AggregateInflow; i64 Bucket; - TTime LastAdd; - Lock BucketMutex; - ui64 Seqno = 0; + TTime LastAdd; + Lock BucketMutex; + ui64 Seqno = 0; TAtomic* InflowTokensPerSecond; TAtomic* BucketTokensCapacity; |