diff options
author | blaze <blaze@yandex-team.ru> | 2022-02-10 16:50:31 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:50:31 +0300 |
commit | 6813864abdb5ce336cde7a2e5cd80232ba54eef1 (patch) | |
tree | 4d210665182fb648da2838c38dba04bab6878dc1 /library/cpp/bucket_quoter/bucket_quoter.h | |
parent | 6e1e62cdffc32768898ccdfd24e046d8b929a45b (diff) | |
download | ydb-6813864abdb5ce336cde7a2e5cd80232ba54eef1.tar.gz |
Restoring authorship annotation for <blaze@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/bucket_quoter/bucket_quoter.h')
-rw-r--r-- | library/cpp/bucket_quoter/bucket_quoter.h | 224 |
1 files changed, 112 insertions, 112 deletions
diff --git a/library/cpp/bucket_quoter/bucket_quoter.h b/library/cpp/bucket_quoter/bucket_quoter.h index 3d92ef8450..03bf7d7641 100644 --- a/library/cpp/bucket_quoter/bucket_quoter.h +++ b/library/cpp/bucket_quoter/bucket_quoter.h @@ -1,44 +1,44 @@ -#pragma once - -#include <util/datetime/base.h> -#include <util/system/mutex.h> +#pragma once + +#include <util/datetime/base.h> +#include <util/system/mutex.h> #include <util/system/hp_timer.h> - -/* Token bucket. - * Makes flow of *inflow* units per second in average, with up to *capacity* bursts. - * Do not use for STRICT flow control. - */ - -/* samples: create and use quoter sending 1000 bytes per second on average, - with up to 60 seconds quota buildup. - - TBucketQuoter quoter(1000, 60000, NULL, NULL, NULL); - - for (;;) { - T *msg = get_message(); - - quoter.Sleep(); - quoter.Use(msg->GetSize()); - send_message(msg); - } - - ---------------------------- - - TBucketQuoter quoter(1000, 60000, NULL, NULL, NULL); - - for (;;) { - T *msg = get_message(); - - while (! quoter.IsAvail()) { - // do something else - } - - quoter.Use(msg->GetSize()); - send_message(msg); - } - -*/ - + +/* Token bucket. + * Makes flow of *inflow* units per second in average, with up to *capacity* bursts. + * Do not use for STRICT flow control. + */ + +/* samples: create and use quoter sending 1000 bytes per second on average, + with up to 60 seconds quota buildup. + + TBucketQuoter quoter(1000, 60000, NULL, NULL, NULL); + + for (;;) { + T *msg = get_message(); + + quoter.Sleep(); + quoter.Use(msg->GetSize()); + send_message(msg); + } + + ---------------------------- + + TBucketQuoter quoter(1000, 60000, NULL, NULL, NULL); + + for (;;) { + T *msg = get_message(); + + while (! quoter.IsAvail()) { + // do something else + } + + quoter.Use(msg->GetSize()); + send_message(msg); + } + +*/ + struct TInstantTimerMs { using TTime = TInstant; static constexpr ui64 Resolution = 1000ull; // milliseconds @@ -69,8 +69,8 @@ struct THPTimerUs { }; template <typename StatCounter, typename Lock = TMutex, typename Timer = TInstantTimerMs> -class TBucketQuoter { -public: +class TBucketQuoter { +public: using TTime = typename Timer::TTime; struct TResult { @@ -79,43 +79,43 @@ public: ui64 Seqno; }; - /* fixed quota */ + /* fixed quota */ TBucketQuoter(ui64 inflow, ui64 capacity, StatCounter* msgPassed = nullptr, StatCounter* bucketUnderflows = nullptr, StatCounter* tokensUsed = nullptr, StatCounter* usecWaited = nullptr, bool fill = false, StatCounter* aggregateInflow = nullptr) - : MsgPassed(msgPassed) - , BucketUnderflows(bucketUnderflows) - , TokensUsed(tokensUsed) + : MsgPassed(msgPassed) + , BucketUnderflows(bucketUnderflows) + , TokensUsed(tokensUsed) , UsecWaited(usecWaited) , AggregateInflow(aggregateInflow) , Bucket(fill ? capacity : 0) , LastAdd(Timer::Now()) - , InflowTokensPerSecond(&FixedInflow) - , BucketTokensCapacity(&FixedCapacity) - , FixedInflow(inflow) - , FixedCapacity(capacity) - { - /* no-op */ - } - - /* adjustable quotas */ + , InflowTokensPerSecond(&FixedInflow) + , BucketTokensCapacity(&FixedCapacity) + , FixedInflow(inflow) + , FixedCapacity(capacity) + { + /* no-op */ + } + + /* adjustable quotas */ TBucketQuoter(TAtomic* inflow, TAtomic* capacity, StatCounter* msgPassed = nullptr, StatCounter* bucketUnderflows = nullptr, StatCounter* tokensUsed = nullptr, StatCounter* usecWaited = nullptr, bool fill = false, StatCounter* aggregateInflow = nullptr) - : MsgPassed(msgPassed) - , BucketUnderflows(bucketUnderflows) - , TokensUsed(tokensUsed) + : MsgPassed(msgPassed) + , BucketUnderflows(bucketUnderflows) + , TokensUsed(tokensUsed) , UsecWaited(usecWaited) , AggregateInflow(aggregateInflow) , Bucket(fill ? AtomicGet(*capacity) : 0) , LastAdd(Timer::Now()) - , InflowTokensPerSecond(inflow) - , BucketTokensCapacity(capacity) - { - /* no-op */ - } - - bool IsAvail() { + , InflowTokensPerSecond(inflow) + , BucketTokensCapacity(capacity) + { + /* no-op */ + } + + bool IsAvail() { TGuard<Lock> g(BucketMutex); FillBucket(); if (Bucket < 0) { @@ -125,21 +125,21 @@ public: } return (Bucket >= 0); } - + bool IsAvail(TResult& res) { TGuard<Lock> g(BucketMutex); res.Before = Bucket; - FillBucket(); + FillBucket(); res.After = Bucket; res.Seqno = ++Seqno; - if (Bucket < 0) { - if (BucketUnderflows) { - (*BucketUnderflows)++; - } - } - return (Bucket >= 0); - } - + if (Bucket < 0) { + if (BucketUnderflows) { + (*BucketUnderflows)++; + } + } + return (Bucket >= 0); + } + ui64 GetAvail() { TGuard<Lock> g(BucketMutex); FillBucket(); @@ -158,8 +158,8 @@ public: void Use(ui64 tokens, bool sleep = false) { TGuard<Lock> g(BucketMutex); UseNoLock(tokens, sleep); - } - + } + void Use(ui64 tokens, TResult& res, bool sleep = false) { TGuard<Lock> g(BucketMutex); res.Before = Bucket; @@ -167,11 +167,11 @@ public: res.After = Bucket; res.Seqno = ++Seqno; } - + i64 UseAndFill(ui64 tokens) { TGuard<Lock> g(BucketMutex); UseNoLock(tokens); - FillBucket(); + FillBucket(); return Bucket; } @@ -192,14 +192,14 @@ public: TGuard<Lock> g(BucketMutex); FillBucket(); - if (Bucket >= 0) { - return 0; - } - - ui32 usec = (-Bucket * 1000000) / (*InflowTokensPerSecond); - return usec; - } - + if (Bucket >= 0) { + return 0; + } + + ui32 usec = (-Bucket * 1000000) / (*InflowTokensPerSecond); + return usec; + } + ui32 GetWaitTime(TResult& res) { TGuard<Lock> g(BucketMutex); res.Before = Bucket; @@ -213,22 +213,22 @@ public: return usec; } - void Sleep() { + void Sleep() { while (!IsAvail()) { - ui32 delay = GetWaitTime(); - if (delay != 0) { - usleep(delay); + ui32 delay = GetWaitTime(); + if (delay != 0) { + usleep(delay); if (UsecWaited) { (*UsecWaited) += delay; } - } - } - } - -private: - void FillBucket() { + } + } + } + +private: + void FillBucket() { TTime now = Timer::Now(); - + ui64 elapsed = Timer::Duration(LastAdd, now); if (*InflowTokensPerSecond * elapsed >= Timer::Resolution) { ui64 inflow = *InflowTokensPerSecond * elapsed / Timer::Resolution; @@ -236,14 +236,14 @@ private: *AggregateInflow += inflow; } Bucket += inflow; - if (Bucket > *BucketTokensCapacity) { - Bucket = *BucketTokensCapacity; - } - - LastAdd = now; - } - } - + if (Bucket > *BucketTokensCapacity) { + Bucket = *BucketTokensCapacity; + } + + LastAdd = now; + } + } + void UseNoLock(ui64 tokens, bool sleep = false) { if (sleep) Sleep(); @@ -268,14 +268,14 @@ private: StatCounter* TokensUsed; StatCounter* UsecWaited; StatCounter* AggregateInflow; - - i64 Bucket; + + i64 Bucket; TTime LastAdd; Lock BucketMutex; ui64 Seqno = 0; - + TAtomic* InflowTokensPerSecond; TAtomic* BucketTokensCapacity; - TAtomic FixedInflow; - TAtomic FixedCapacity; -}; + TAtomic FixedInflow; + TAtomic FixedCapacity; +}; |