aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/messagebus/misc
diff options
context:
space:
mode:
authorsingle <single@yandex-team.ru>2022-02-10 16:50:29 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:50:29 +0300
commit8ae96df130bbede609c3504aa9af1bc6ff5361b3 (patch)
tree4751832974bd75ca721269aa54faa15d76032dfb /library/cpp/messagebus/misc
parent5d4e7b7c923852e0f6398791ec98a60cf9faab46 (diff)
downloadydb-8ae96df130bbede609c3504aa9af1bc6ff5361b3.tar.gz
Restoring authorship annotation for <single@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/messagebus/misc')
-rw-r--r--library/cpp/messagebus/misc/granup.h66
-rw-r--r--library/cpp/messagebus/misc/tokenquota.h126
2 files changed, 96 insertions, 96 deletions
diff --git a/library/cpp/messagebus/misc/granup.h b/library/cpp/messagebus/misc/granup.h
index 36ecfebc93..8b04aca597 100644
--- a/library/cpp/messagebus/misc/granup.h
+++ b/library/cpp/messagebus/misc/granup.h
@@ -1,50 +1,50 @@
-#pragma once
-
+#pragma once
+
#include <util/datetime/base.h>
#include <util/system/guard.h>
-#include <util/system/mutex.h>
-#include <util/system/spinlock.h>
-
-namespace NBus {
- template <typename TItem, typename TLocker = TSpinLock>
- class TGranUp {
- public:
- TGranUp(TDuration gran)
- : Gran(gran)
+#include <util/system/mutex.h>
+#include <util/system/spinlock.h>
+
+namespace NBus {
+ template <typename TItem, typename TLocker = TSpinLock>
+ class TGranUp {
+ public:
+ TGranUp(TDuration gran)
+ : Gran(gran)
, Next(TInstant::MicroSeconds(0))
{
}
-
+
template <typename TFunctor>
void Update(TFunctor functor, TInstant now, bool force = false) {
if (force || now > Next)
- Set(functor(), now);
- }
-
+ Set(functor(), now);
+ }
+
void Update(const TItem& item, TInstant now, bool force = false) {
if (force || now > Next)
- Set(item, now);
- }
-
+ Set(item, now);
+ }
+
TItem Get() const noexcept {
TGuard<TLocker> guard(Lock);
-
- return Item;
- }
-
- protected:
+
+ return Item;
+ }
+
+ protected:
void Set(const TItem& item, TInstant now) {
TGuard<TLocker> guard(Lock);
-
- Item = item;
-
- Next = now + Gran;
- }
-
- private:
- const TDuration Gran;
+
+ Item = item;
+
+ Next = now + Gran;
+ }
+
+ private:
+ const TDuration Gran;
TLocker Lock;
TItem Item;
TInstant Next;
- };
-}
+ };
+}
diff --git a/library/cpp/messagebus/misc/tokenquota.h b/library/cpp/messagebus/misc/tokenquota.h
index 190547fa54..954cf0f0d7 100644
--- a/library/cpp/messagebus/misc/tokenquota.h
+++ b/library/cpp/messagebus/misc/tokenquota.h
@@ -1,83 +1,83 @@
-#pragma once
-
-#include <util/system/atomic.h>
-
-namespace NBus {
- /* Consumer and feeder quota model impl.
-
- Consumer thread only calls:
- Acquire(), fetches tokens for usage from bucket;
- Consume(), eats given amount of tokens, must not
- be greater than Value() items;
-
- Other threads (feeders) calls:
- Return(), put used tokens back to bucket;
- */
-
- class TTokenQuota {
- public:
- TTokenQuota(bool enabled, size_t tokens, size_t wake)
- : Enabled(tokens > 0 ? enabled : false)
- , Acquired(0)
- , WakeLev(wake < 1 ? Max<size_t>(1, tokens / 2) : 0)
- , Tokens_(tokens)
+#pragma once
+
+#include <util/system/atomic.h>
+
+namespace NBus {
+ /* Consumer and feeder quota model impl.
+
+ Consumer thread only calls:
+ Acquire(), fetches tokens for usage from bucket;
+ Consume(), eats given amount of tokens, must not
+ be greater than Value() items;
+
+ Other threads (feeders) calls:
+ Return(), put used tokens back to bucket;
+ */
+
+ class TTokenQuota {
+ public:
+ TTokenQuota(bool enabled, size_t tokens, size_t wake)
+ : Enabled(tokens > 0 ? enabled : false)
+ , Acquired(0)
+ , WakeLev(wake < 1 ? Max<size_t>(1, tokens / 2) : 0)
+ , Tokens_(tokens)
{
Y_UNUSED(padd_);
}
-
+
bool Acquire(TAtomic level = 1, bool force = false) {
level = Max(TAtomicBase(level), TAtomicBase(1));
-
+
if (Enabled && (Acquired < level || force)) {
Acquired += AtomicSwap(&Tokens_, 0);
- }
-
- return !Enabled || Acquired >= level;
- }
-
+ }
+
+ return !Enabled || Acquired >= level;
+ }
+
void Consume(size_t items) {
if (Enabled) {
Y_ASSERT(Acquired >= TAtomicBase(items));
-
- Acquired -= items;
- }
- }
-
+
+ Acquired -= items;
+ }
+ }
+
bool Return(size_t items_) noexcept {
if (!Enabled || items_ == 0)
- return false;
-
- const TAtomic items = items_;
- const TAtomic value = AtomicAdd(Tokens_, items);
-
- return (value - items < WakeLev && value >= WakeLev);
- }
-
+ return false;
+
+ const TAtomic items = items_;
+ const TAtomic value = AtomicAdd(Tokens_, items);
+
+ return (value - items < WakeLev && value >= WakeLev);
+ }
+
bool IsEnabled() const noexcept {
- return Enabled;
- }
-
+ return Enabled;
+ }
+
bool IsAboveWake() const noexcept {
- return !Enabled || (WakeLev <= AtomicGet(Tokens_));
- }
-
+ return !Enabled || (WakeLev <= AtomicGet(Tokens_));
+ }
+
size_t Tokens() const noexcept {
- return Acquired + AtomicGet(Tokens_);
- }
-
+ return Acquired + AtomicGet(Tokens_);
+ }
+
size_t Check(const TAtomic level) const noexcept {
- return !Enabled || level <= Acquired;
- }
-
- private:
+ return !Enabled || level <= Acquired;
+ }
+
+ private:
bool Enabled;
TAtomicBase Acquired;
- const TAtomicBase WakeLev;
+ const TAtomicBase WakeLev;
TAtomic Tokens_;
-
- /* This padd requires for align Tokens_ member on its own
- CPU cacheline. */
-
+
+ /* This padd requires for align Tokens_ member on its own
+ CPU cacheline. */
+
ui64 padd_;
- };
-}
+ };
+}