aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/bucket_quoter/bucket_quoter.h
diff options
context:
space:
mode:
authorblaze <blaze@yandex-team.ru>2022-02-10 16:50:31 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:50:31 +0300
commit6813864abdb5ce336cde7a2e5cd80232ba54eef1 (patch)
tree4d210665182fb648da2838c38dba04bab6878dc1 /library/cpp/bucket_quoter/bucket_quoter.h
parent6e1e62cdffc32768898ccdfd24e046d8b929a45b (diff)
downloadydb-6813864abdb5ce336cde7a2e5cd80232ba54eef1.tar.gz
Restoring authorship annotation for <blaze@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/bucket_quoter/bucket_quoter.h')
-rw-r--r--library/cpp/bucket_quoter/bucket_quoter.h224
1 files changed, 112 insertions, 112 deletions
diff --git a/library/cpp/bucket_quoter/bucket_quoter.h b/library/cpp/bucket_quoter/bucket_quoter.h
index 3d92ef8450..03bf7d7641 100644
--- a/library/cpp/bucket_quoter/bucket_quoter.h
+++ b/library/cpp/bucket_quoter/bucket_quoter.h
@@ -1,44 +1,44 @@
-#pragma once
-
-#include <util/datetime/base.h>
-#include <util/system/mutex.h>
+#pragma once
+
+#include <util/datetime/base.h>
+#include <util/system/mutex.h>
#include <util/system/hp_timer.h>
-
-/* Token bucket.
- * Makes flow of *inflow* units per second in average, with up to *capacity* bursts.
- * Do not use for STRICT flow control.
- */
-
-/* samples: create and use quoter sending 1000 bytes per second on average,
- with up to 60 seconds quota buildup.
-
- TBucketQuoter quoter(1000, 60000, NULL, NULL, NULL);
-
- for (;;) {
- T *msg = get_message();
-
- quoter.Sleep();
- quoter.Use(msg->GetSize());
- send_message(msg);
- }
-
- ----------------------------
-
- TBucketQuoter quoter(1000, 60000, NULL, NULL, NULL);
-
- for (;;) {
- T *msg = get_message();
-
- while (! quoter.IsAvail()) {
- // do something else
- }
-
- quoter.Use(msg->GetSize());
- send_message(msg);
- }
-
-*/
-
+
+/* Token bucket.
+ * Makes flow of *inflow* units per second in average, with up to *capacity* bursts.
+ * Do not use for STRICT flow control.
+ */
+
+/* samples: create and use quoter sending 1000 bytes per second on average,
+ with up to 60 seconds quota buildup.
+
+ TBucketQuoter quoter(1000, 60000, NULL, NULL, NULL);
+
+ for (;;) {
+ T *msg = get_message();
+
+ quoter.Sleep();
+ quoter.Use(msg->GetSize());
+ send_message(msg);
+ }
+
+ ----------------------------
+
+ TBucketQuoter quoter(1000, 60000, NULL, NULL, NULL);
+
+ for (;;) {
+ T *msg = get_message();
+
+ while (! quoter.IsAvail()) {
+ // do something else
+ }
+
+ quoter.Use(msg->GetSize());
+ send_message(msg);
+ }
+
+*/
+
struct TInstantTimerMs {
using TTime = TInstant;
static constexpr ui64 Resolution = 1000ull; // milliseconds
@@ -69,8 +69,8 @@ struct THPTimerUs {
};
template <typename StatCounter, typename Lock = TMutex, typename Timer = TInstantTimerMs>
-class TBucketQuoter {
-public:
+class TBucketQuoter {
+public:
using TTime = typename Timer::TTime;
struct TResult {
@@ -79,43 +79,43 @@ public:
ui64 Seqno;
};
- /* fixed quota */
+ /* fixed quota */
TBucketQuoter(ui64 inflow, ui64 capacity, StatCounter* msgPassed = nullptr,
StatCounter* bucketUnderflows = nullptr, StatCounter* tokensUsed = nullptr,
StatCounter* usecWaited = nullptr, bool fill = false, StatCounter* aggregateInflow = nullptr)
- : MsgPassed(msgPassed)
- , BucketUnderflows(bucketUnderflows)
- , TokensUsed(tokensUsed)
+ : MsgPassed(msgPassed)
+ , BucketUnderflows(bucketUnderflows)
+ , TokensUsed(tokensUsed)
, UsecWaited(usecWaited)
, AggregateInflow(aggregateInflow)
, Bucket(fill ? capacity : 0)
, LastAdd(Timer::Now())
- , InflowTokensPerSecond(&FixedInflow)
- , BucketTokensCapacity(&FixedCapacity)
- , FixedInflow(inflow)
- , FixedCapacity(capacity)
- {
- /* no-op */
- }
-
- /* adjustable quotas */
+ , InflowTokensPerSecond(&FixedInflow)
+ , BucketTokensCapacity(&FixedCapacity)
+ , FixedInflow(inflow)
+ , FixedCapacity(capacity)
+ {
+ /* no-op */
+ }
+
+ /* adjustable quotas */
TBucketQuoter(TAtomic* inflow, TAtomic* capacity, StatCounter* msgPassed = nullptr,
StatCounter* bucketUnderflows = nullptr, StatCounter* tokensUsed = nullptr,
StatCounter* usecWaited = nullptr, bool fill = false, StatCounter* aggregateInflow = nullptr)
- : MsgPassed(msgPassed)
- , BucketUnderflows(bucketUnderflows)
- , TokensUsed(tokensUsed)
+ : MsgPassed(msgPassed)
+ , BucketUnderflows(bucketUnderflows)
+ , TokensUsed(tokensUsed)
, UsecWaited(usecWaited)
, AggregateInflow(aggregateInflow)
, Bucket(fill ? AtomicGet(*capacity) : 0)
, LastAdd(Timer::Now())
- , InflowTokensPerSecond(inflow)
- , BucketTokensCapacity(capacity)
- {
- /* no-op */
- }
-
- bool IsAvail() {
+ , InflowTokensPerSecond(inflow)
+ , BucketTokensCapacity(capacity)
+ {
+ /* no-op */
+ }
+
+ bool IsAvail() {
TGuard<Lock> g(BucketMutex);
FillBucket();
if (Bucket < 0) {
@@ -125,21 +125,21 @@ public:
}
return (Bucket >= 0);
}
-
+
bool IsAvail(TResult& res) {
TGuard<Lock> g(BucketMutex);
res.Before = Bucket;
- FillBucket();
+ FillBucket();
res.After = Bucket;
res.Seqno = ++Seqno;
- if (Bucket < 0) {
- if (BucketUnderflows) {
- (*BucketUnderflows)++;
- }
- }
- return (Bucket >= 0);
- }
-
+ if (Bucket < 0) {
+ if (BucketUnderflows) {
+ (*BucketUnderflows)++;
+ }
+ }
+ return (Bucket >= 0);
+ }
+
ui64 GetAvail() {
TGuard<Lock> g(BucketMutex);
FillBucket();
@@ -158,8 +158,8 @@ public:
void Use(ui64 tokens, bool sleep = false) {
TGuard<Lock> g(BucketMutex);
UseNoLock(tokens, sleep);
- }
-
+ }
+
void Use(ui64 tokens, TResult& res, bool sleep = false) {
TGuard<Lock> g(BucketMutex);
res.Before = Bucket;
@@ -167,11 +167,11 @@ public:
res.After = Bucket;
res.Seqno = ++Seqno;
}
-
+
i64 UseAndFill(ui64 tokens) {
TGuard<Lock> g(BucketMutex);
UseNoLock(tokens);
- FillBucket();
+ FillBucket();
return Bucket;
}
@@ -192,14 +192,14 @@ public:
TGuard<Lock> g(BucketMutex);
FillBucket();
- if (Bucket >= 0) {
- return 0;
- }
-
- ui32 usec = (-Bucket * 1000000) / (*InflowTokensPerSecond);
- return usec;
- }
-
+ if (Bucket >= 0) {
+ return 0;
+ }
+
+ ui32 usec = (-Bucket * 1000000) / (*InflowTokensPerSecond);
+ return usec;
+ }
+
ui32 GetWaitTime(TResult& res) {
TGuard<Lock> g(BucketMutex);
res.Before = Bucket;
@@ -213,22 +213,22 @@ public:
return usec;
}
- void Sleep() {
+ void Sleep() {
while (!IsAvail()) {
- ui32 delay = GetWaitTime();
- if (delay != 0) {
- usleep(delay);
+ ui32 delay = GetWaitTime();
+ if (delay != 0) {
+ usleep(delay);
if (UsecWaited) {
(*UsecWaited) += delay;
}
- }
- }
- }
-
-private:
- void FillBucket() {
+ }
+ }
+ }
+
+private:
+ void FillBucket() {
TTime now = Timer::Now();
-
+
ui64 elapsed = Timer::Duration(LastAdd, now);
if (*InflowTokensPerSecond * elapsed >= Timer::Resolution) {
ui64 inflow = *InflowTokensPerSecond * elapsed / Timer::Resolution;
@@ -236,14 +236,14 @@ private:
*AggregateInflow += inflow;
}
Bucket += inflow;
- if (Bucket > *BucketTokensCapacity) {
- Bucket = *BucketTokensCapacity;
- }
-
- LastAdd = now;
- }
- }
-
+ if (Bucket > *BucketTokensCapacity) {
+ Bucket = *BucketTokensCapacity;
+ }
+
+ LastAdd = now;
+ }
+ }
+
void UseNoLock(ui64 tokens, bool sleep = false) {
if (sleep)
Sleep();
@@ -268,14 +268,14 @@ private:
StatCounter* TokensUsed;
StatCounter* UsecWaited;
StatCounter* AggregateInflow;
-
- i64 Bucket;
+
+ i64 Bucket;
TTime LastAdd;
Lock BucketMutex;
ui64 Seqno = 0;
-
+
TAtomic* InflowTokensPerSecond;
TAtomic* BucketTokensCapacity;
- TAtomic FixedInflow;
- TAtomic FixedCapacity;
-};
+ TAtomic FixedInflow;
+ TAtomic FixedCapacity;
+};