diff options
author | single <single@yandex-team.ru> | 2022-02-10 16:50:29 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:50:29 +0300 |
commit | 8ae96df130bbede609c3504aa9af1bc6ff5361b3 (patch) | |
tree | 4751832974bd75ca721269aa54faa15d76032dfb /library/cpp/messagebus/misc/tokenquota.h | |
parent | 5d4e7b7c923852e0f6398791ec98a60cf9faab46 (diff) | |
download | ydb-8ae96df130bbede609c3504aa9af1bc6ff5361b3.tar.gz |
Restoring authorship annotation for <single@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/messagebus/misc/tokenquota.h')
-rw-r--r-- | library/cpp/messagebus/misc/tokenquota.h | 126 |
1 files changed, 63 insertions, 63 deletions
diff --git a/library/cpp/messagebus/misc/tokenquota.h b/library/cpp/messagebus/misc/tokenquota.h index 190547fa54..954cf0f0d7 100644 --- a/library/cpp/messagebus/misc/tokenquota.h +++ b/library/cpp/messagebus/misc/tokenquota.h @@ -1,83 +1,83 @@ -#pragma once - -#include <util/system/atomic.h> - -namespace NBus { - /* Consumer and feeder quota model impl. - - Consumer thread only calls: - Acquire(), fetches tokens for usage from bucket; - Consume(), eats given amount of tokens, must not - be greater than Value() items; - - Other threads (feeders) calls: - Return(), put used tokens back to bucket; - */ - - class TTokenQuota { - public: - TTokenQuota(bool enabled, size_t tokens, size_t wake) - : Enabled(tokens > 0 ? enabled : false) - , Acquired(0) - , WakeLev(wake < 1 ? Max<size_t>(1, tokens / 2) : 0) - , Tokens_(tokens) +#pragma once + +#include <util/system/atomic.h> + +namespace NBus { + /* Consumer and feeder quota model impl. + + Consumer thread only calls: + Acquire(), fetches tokens for usage from bucket; + Consume(), eats given amount of tokens, must not + be greater than Value() items; + + Other threads (feeders) calls: + Return(), put used tokens back to bucket; + */ + + class TTokenQuota { + public: + TTokenQuota(bool enabled, size_t tokens, size_t wake) + : Enabled(tokens > 0 ? enabled : false) + , Acquired(0) + , WakeLev(wake < 1 ? Max<size_t>(1, tokens / 2) : 0) + , Tokens_(tokens) { Y_UNUSED(padd_); } - + bool Acquire(TAtomic level = 1, bool force = false) { level = Max(TAtomicBase(level), TAtomicBase(1)); - + if (Enabled && (Acquired < level || force)) { Acquired += AtomicSwap(&Tokens_, 0); - } - - return !Enabled || Acquired >= level; - } - + } + + return !Enabled || Acquired >= level; + } + void Consume(size_t items) { if (Enabled) { Y_ASSERT(Acquired >= TAtomicBase(items)); - - Acquired -= items; - } - } - + + Acquired -= items; + } + } + bool Return(size_t items_) noexcept { if (!Enabled || items_ == 0) - return false; - - const TAtomic items = items_; - const TAtomic value = AtomicAdd(Tokens_, items); - - return (value - items < WakeLev && value >= WakeLev); - } - + return false; + + const TAtomic items = items_; + const TAtomic value = AtomicAdd(Tokens_, items); + + return (value - items < WakeLev && value >= WakeLev); + } + bool IsEnabled() const noexcept { - return Enabled; - } - + return Enabled; + } + bool IsAboveWake() const noexcept { - return !Enabled || (WakeLev <= AtomicGet(Tokens_)); - } - + return !Enabled || (WakeLev <= AtomicGet(Tokens_)); + } + size_t Tokens() const noexcept { - return Acquired + AtomicGet(Tokens_); - } - + return Acquired + AtomicGet(Tokens_); + } + size_t Check(const TAtomic level) const noexcept { - return !Enabled || level <= Acquired; - } - - private: + return !Enabled || level <= Acquired; + } + + private: bool Enabled; TAtomicBase Acquired; - const TAtomicBase WakeLev; + const TAtomicBase WakeLev; TAtomic Tokens_; - - /* This padd requires for align Tokens_ member on its own - CPU cacheline. */ - + + /* This padd requires for align Tokens_ member on its own + CPU cacheline. */ + ui64 padd_; - }; -} + }; +} |