diff options
author | single <single@yandex-team.ru> | 2022-02-10 16:50:29 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:50:29 +0300 |
commit | 8ae96df130bbede609c3504aa9af1bc6ff5361b3 (patch) | |
tree | 4751832974bd75ca721269aa54faa15d76032dfb /library/cpp/messagebus/misc | |
parent | 5d4e7b7c923852e0f6398791ec98a60cf9faab46 (diff) | |
download | ydb-8ae96df130bbede609c3504aa9af1bc6ff5361b3.tar.gz |
Restoring authorship annotation for <single@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/messagebus/misc')
-rw-r--r-- | library/cpp/messagebus/misc/granup.h | 66 | ||||
-rw-r--r-- | library/cpp/messagebus/misc/tokenquota.h | 126 |
2 files changed, 96 insertions, 96 deletions
diff --git a/library/cpp/messagebus/misc/granup.h b/library/cpp/messagebus/misc/granup.h index 36ecfebc93..8b04aca597 100644 --- a/library/cpp/messagebus/misc/granup.h +++ b/library/cpp/messagebus/misc/granup.h @@ -1,50 +1,50 @@ -#pragma once - +#pragma once + #include <util/datetime/base.h> #include <util/system/guard.h> -#include <util/system/mutex.h> -#include <util/system/spinlock.h> - -namespace NBus { - template <typename TItem, typename TLocker = TSpinLock> - class TGranUp { - public: - TGranUp(TDuration gran) - : Gran(gran) +#include <util/system/mutex.h> +#include <util/system/spinlock.h> + +namespace NBus { + template <typename TItem, typename TLocker = TSpinLock> + class TGranUp { + public: + TGranUp(TDuration gran) + : Gran(gran) , Next(TInstant::MicroSeconds(0)) { } - + template <typename TFunctor> void Update(TFunctor functor, TInstant now, bool force = false) { if (force || now > Next) - Set(functor(), now); - } - + Set(functor(), now); + } + void Update(const TItem& item, TInstant now, bool force = false) { if (force || now > Next) - Set(item, now); - } - + Set(item, now); + } + TItem Get() const noexcept { TGuard<TLocker> guard(Lock); - - return Item; - } - - protected: + + return Item; + } + + protected: void Set(const TItem& item, TInstant now) { TGuard<TLocker> guard(Lock); - - Item = item; - - Next = now + Gran; - } - - private: - const TDuration Gran; + + Item = item; + + Next = now + Gran; + } + + private: + const TDuration Gran; TLocker Lock; TItem Item; TInstant Next; - }; -} + }; +} diff --git a/library/cpp/messagebus/misc/tokenquota.h b/library/cpp/messagebus/misc/tokenquota.h index 190547fa54..954cf0f0d7 100644 --- a/library/cpp/messagebus/misc/tokenquota.h +++ b/library/cpp/messagebus/misc/tokenquota.h @@ -1,83 +1,83 @@ -#pragma once - -#include <util/system/atomic.h> - -namespace NBus { - /* Consumer and feeder quota model impl. - - Consumer thread only calls: - Acquire(), fetches tokens for usage from bucket; - Consume(), eats given amount of tokens, must not - be greater than Value() items; - - Other threads (feeders) calls: - Return(), put used tokens back to bucket; - */ - - class TTokenQuota { - public: - TTokenQuota(bool enabled, size_t tokens, size_t wake) - : Enabled(tokens > 0 ? enabled : false) - , Acquired(0) - , WakeLev(wake < 1 ? Max<size_t>(1, tokens / 2) : 0) - , Tokens_(tokens) +#pragma once + +#include <util/system/atomic.h> + +namespace NBus { + /* Consumer and feeder quota model impl. + + Consumer thread only calls: + Acquire(), fetches tokens for usage from bucket; + Consume(), eats given amount of tokens, must not + be greater than Value() items; + + Other threads (feeders) calls: + Return(), put used tokens back to bucket; + */ + + class TTokenQuota { + public: + TTokenQuota(bool enabled, size_t tokens, size_t wake) + : Enabled(tokens > 0 ? enabled : false) + , Acquired(0) + , WakeLev(wake < 1 ? Max<size_t>(1, tokens / 2) : 0) + , Tokens_(tokens) { Y_UNUSED(padd_); } - + bool Acquire(TAtomic level = 1, bool force = false) { level = Max(TAtomicBase(level), TAtomicBase(1)); - + if (Enabled && (Acquired < level || force)) { Acquired += AtomicSwap(&Tokens_, 0); - } - - return !Enabled || Acquired >= level; - } - + } + + return !Enabled || Acquired >= level; + } + void Consume(size_t items) { if (Enabled) { Y_ASSERT(Acquired >= TAtomicBase(items)); - - Acquired -= items; - } - } - + + Acquired -= items; + } + } + bool Return(size_t items_) noexcept { if (!Enabled || items_ == 0) - return false; - - const TAtomic items = items_; - const TAtomic value = AtomicAdd(Tokens_, items); - - return (value - items < WakeLev && value >= WakeLev); - } - + return false; + + const TAtomic items = items_; + const TAtomic value = AtomicAdd(Tokens_, items); + + return (value - items < WakeLev && value >= WakeLev); + } + bool IsEnabled() const noexcept { - return Enabled; - } - + return Enabled; + } + bool IsAboveWake() const noexcept { - return !Enabled || (WakeLev <= AtomicGet(Tokens_)); - } - + return !Enabled || (WakeLev <= AtomicGet(Tokens_)); + } + size_t Tokens() const noexcept { - return Acquired + AtomicGet(Tokens_); - } - + return Acquired + AtomicGet(Tokens_); + } + size_t Check(const TAtomic level) const noexcept { - return !Enabled || level <= Acquired; - } - - private: + return !Enabled || level <= Acquired; + } + + private: bool Enabled; TAtomicBase Acquired; - const TAtomicBase WakeLev; + const TAtomicBase WakeLev; TAtomic Tokens_; - - /* This padd requires for align Tokens_ member on its own - CPU cacheline. */ - + + /* This padd requires for align Tokens_ member on its own + CPU cacheline. */ + ui64 padd_; - }; -} + }; +} |