diff options
author | Devtools Arcadia <arcadia-devtools@yandex-team.ru> | 2022-02-07 18:08:42 +0300 |
---|---|---|
committer | Devtools Arcadia <arcadia-devtools@mous.vla.yp-c.yandex.net> | 2022-02-07 18:08:42 +0300 |
commit | 1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch) | |
tree | e26c9fed0de5d9873cce7e00bc214573dc2195b7 /library/cpp/messagebus/misc | |
download | ydb-1110808a9d39d4b808aef724c861a2e1a38d2a69.tar.gz |
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'library/cpp/messagebus/misc')
-rw-r--r-- | library/cpp/messagebus/misc/atomic_box.h | 34 | ||||
-rw-r--r-- | library/cpp/messagebus/misc/granup.h | 50 | ||||
-rw-r--r-- | library/cpp/messagebus/misc/test_sync.h | 75 | ||||
-rw-r--r-- | library/cpp/messagebus/misc/tokenquota.h | 83 | ||||
-rw-r--r-- | library/cpp/messagebus/misc/weak_ptr.h | 99 | ||||
-rw-r--r-- | library/cpp/messagebus/misc/weak_ptr_ut.cpp | 46 |
6 files changed, 387 insertions, 0 deletions
diff --git a/library/cpp/messagebus/misc/atomic_box.h b/library/cpp/messagebus/misc/atomic_box.h new file mode 100644 index 0000000000..401621f933 --- /dev/null +++ b/library/cpp/messagebus/misc/atomic_box.h @@ -0,0 +1,34 @@ +#pragma once + +#include <util/system/atomic.h> + +// TAtomic with human interface +template <typename T> +class TAtomicBox { +private: + union { + TAtomic Value; + // when T is enum, it is convenient to inspect its content in gdb + T ValueForDebugger; + }; + + static_assert(sizeof(T) <= sizeof(TAtomic), "expect sizeof(T) <= sizeof(TAtomic)"); + +public: + TAtomicBox(T value = T()) + : Value(value) + { + } + + void Set(T value) { + AtomicSet(Value, (TAtomic)value); + } + + T Get() const { + return (T)AtomicGet(Value); + } + + bool CompareAndSet(T expected, T set) { + return AtomicCas(&Value, (TAtomicBase)set, (TAtomicBase)expected); + } +}; diff --git a/library/cpp/messagebus/misc/granup.h b/library/cpp/messagebus/misc/granup.h new file mode 100644 index 0000000000..36ecfebc93 --- /dev/null +++ b/library/cpp/messagebus/misc/granup.h @@ -0,0 +1,50 @@ +#pragma once + +#include <util/datetime/base.h> +#include <util/system/guard.h> +#include <util/system/mutex.h> +#include <util/system/spinlock.h> + +namespace NBus { + template <typename TItem, typename TLocker = TSpinLock> + class TGranUp { + public: + TGranUp(TDuration gran) + : Gran(gran) + , Next(TInstant::MicroSeconds(0)) + { + } + + template <typename TFunctor> + void Update(TFunctor functor, TInstant now, bool force = false) { + if (force || now > Next) + Set(functor(), now); + } + + void Update(const TItem& item, TInstant now, bool force = false) { + if (force || now > Next) + Set(item, now); + } + + TItem Get() const noexcept { + TGuard<TLocker> guard(Lock); + + return Item; + } + + protected: + void Set(const TItem& item, TInstant now) { + TGuard<TLocker> guard(Lock); + + Item = item; + + Next = now + Gran; + } + + private: + const TDuration Gran; + TLocker Lock; + TItem Item; + TInstant Next; + }; +} diff --git a/library/cpp/messagebus/misc/test_sync.h b/library/cpp/messagebus/misc/test_sync.h new file mode 100644 index 0000000000..be3f4f20b8 --- /dev/null +++ b/library/cpp/messagebus/misc/test_sync.h @@ -0,0 +1,75 @@ +#pragma once + +#include <util/system/condvar.h> +#include <util/system/mutex.h> + +class TTestSync { +private: + unsigned Current; + + TMutex Mutex; + TCondVar CondVar; + +public: + TTestSync() + : Current(0) + { + } + + void Inc() { + TGuard<TMutex> guard(Mutex); + + DoInc(); + CondVar.BroadCast(); + } + + unsigned Get() { + TGuard<TMutex> guard(Mutex); + + return Current; + } + + void WaitFor(unsigned n) { + TGuard<TMutex> guard(Mutex); + + Y_VERIFY(Current <= n, "too late, waiting for %d, already %d", n, Current); + + while (n > Current) { + CondVar.WaitI(Mutex); + } + } + + void WaitForAndIncrement(unsigned n) { + TGuard<TMutex> guard(Mutex); + + Y_VERIFY(Current <= n, "too late, waiting for %d, already %d", n, Current); + + while (n > Current) { + CondVar.WaitI(Mutex); + } + + DoInc(); + CondVar.BroadCast(); + } + + void CheckAndIncrement(unsigned n) { + TGuard<TMutex> guard(Mutex); + + Y_VERIFY(Current == n, "must be %d, currently %d", n, Current); + + DoInc(); + CondVar.BroadCast(); + } + + void Check(unsigned n) { + TGuard<TMutex> guard(Mutex); + + Y_VERIFY(Current == n, "must be %d, currently %d", n, Current); + } + +private: + void DoInc() { + unsigned r = ++Current; + Y_UNUSED(r); + } +}; diff --git a/library/cpp/messagebus/misc/tokenquota.h b/library/cpp/messagebus/misc/tokenquota.h new file mode 100644 index 0000000000..190547fa54 --- /dev/null +++ b/library/cpp/messagebus/misc/tokenquota.h @@ -0,0 +1,83 @@ +#pragma once + +#include <util/system/atomic.h> + +namespace NBus { + /* Consumer and feeder quota model impl. + + Consumer thread only calls: + Acquire(), fetches tokens for usage from bucket; + Consume(), eats given amount of tokens, must not + be greater than Value() items; + + Other threads (feeders) calls: + Return(), put used tokens back to bucket; + */ + + class TTokenQuota { + public: + TTokenQuota(bool enabled, size_t tokens, size_t wake) + : Enabled(tokens > 0 ? enabled : false) + , Acquired(0) + , WakeLev(wake < 1 ? Max<size_t>(1, tokens / 2) : 0) + , Tokens_(tokens) + { + Y_UNUSED(padd_); + } + + bool Acquire(TAtomic level = 1, bool force = false) { + level = Max(TAtomicBase(level), TAtomicBase(1)); + + if (Enabled && (Acquired < level || force)) { + Acquired += AtomicSwap(&Tokens_, 0); + } + + return !Enabled || Acquired >= level; + } + + void Consume(size_t items) { + if (Enabled) { + Y_ASSERT(Acquired >= TAtomicBase(items)); + + Acquired -= items; + } + } + + bool Return(size_t items_) noexcept { + if (!Enabled || items_ == 0) + return false; + + const TAtomic items = items_; + const TAtomic value = AtomicAdd(Tokens_, items); + + return (value - items < WakeLev && value >= WakeLev); + } + + bool IsEnabled() const noexcept { + return Enabled; + } + + bool IsAboveWake() const noexcept { + return !Enabled || (WakeLev <= AtomicGet(Tokens_)); + } + + size_t Tokens() const noexcept { + return Acquired + AtomicGet(Tokens_); + } + + size_t Check(const TAtomic level) const noexcept { + return !Enabled || level <= Acquired; + } + + private: + bool Enabled; + TAtomicBase Acquired; + const TAtomicBase WakeLev; + TAtomic Tokens_; + + /* This padd requires for align Tokens_ member on its own + CPU cacheline. */ + + ui64 padd_; + }; +} diff --git a/library/cpp/messagebus/misc/weak_ptr.h b/library/cpp/messagebus/misc/weak_ptr.h new file mode 100644 index 0000000000..70fdeb0e2a --- /dev/null +++ b/library/cpp/messagebus/misc/weak_ptr.h @@ -0,0 +1,99 @@ +#pragma once + +#include <util/generic/ptr.h> +#include <util/system/mutex.h> + +template <typename T> +struct TWeakPtr; + +template <typename TSelf> +struct TWeakRefCounted { + template <typename> + friend struct TWeakPtr; + +private: + struct TRef: public TAtomicRefCount<TRef> { + TMutex Mutex; + TSelf* Outer; + + TRef(TSelf* outer) + : Outer(outer) + { + } + + void Release() { + TGuard<TMutex> g(Mutex); + Y_ASSERT(!!Outer); + Outer = nullptr; + } + + TIntrusivePtr<TSelf> Get() { + TGuard<TMutex> g(Mutex); + Y_ASSERT(!Outer || Outer->RefCount() > 0); + return Outer; + } + }; + + TAtomicCounter Counter; + TIntrusivePtr<TRef> RefPtr; + +public: + TWeakRefCounted() + : RefPtr(new TRef(static_cast<TSelf*>(this))) + { + } + + void Ref() { + Counter.Inc(); + } + + void UnRef() { + if (Counter.Dec() == 0) { + RefPtr->Release(); + + // drop is to prevent dtor from reading it + RefPtr.Drop(); + + delete static_cast<TSelf*>(this); + } + } + + void DecRef() { + Counter.Dec(); + } + + unsigned RefCount() const { + return Counter.Val(); + } +}; + +template <typename T> +struct TWeakPtr { +private: + typedef TIntrusivePtr<typename T::TRef> TRefPtr; + TRefPtr RefPtr; + +public: + TWeakPtr() { + } + + TWeakPtr(T* t) { + if (!!t) { + RefPtr = t->RefPtr; + } + } + + TWeakPtr(TIntrusivePtr<T> t) { + if (!!t) { + RefPtr = t->RefPtr; + } + } + + TIntrusivePtr<T> Get() { + if (!RefPtr) { + return nullptr; + } else { + return RefPtr->Get(); + } + } +}; diff --git a/library/cpp/messagebus/misc/weak_ptr_ut.cpp b/library/cpp/messagebus/misc/weak_ptr_ut.cpp new file mode 100644 index 0000000000..5a325278db --- /dev/null +++ b/library/cpp/messagebus/misc/weak_ptr_ut.cpp @@ -0,0 +1,46 @@ +#include <library/cpp/testing/unittest/registar.h> + +#include "weak_ptr.h" + +Y_UNIT_TEST_SUITE(TWeakPtrTest) { + struct TWeakPtrTester: public TWeakRefCounted<TWeakPtrTester> { + int* const CounterPtr; + + TWeakPtrTester(int* counterPtr) + : CounterPtr(counterPtr) + { + } + ~TWeakPtrTester() { + ++*CounterPtr; + } + }; + + Y_UNIT_TEST(Simple) { + int destroyCount = 0; + + TIntrusivePtr<TWeakPtrTester> p(new TWeakPtrTester(&destroyCount)); + + UNIT_ASSERT(!!p); + UNIT_ASSERT_VALUES_EQUAL(1u, p->RefCount()); + + TWeakPtr<TWeakPtrTester> p2(p); + + UNIT_ASSERT_VALUES_EQUAL(1u, p->RefCount()); + + { + TIntrusivePtr<TWeakPtrTester> p3 = p2.Get(); + UNIT_ASSERT(!!p3); + UNIT_ASSERT_VALUES_EQUAL(2u, p->RefCount()); + } + + p.Drop(); + UNIT_ASSERT_VALUES_EQUAL(1, destroyCount); + + { + TIntrusivePtr<TWeakPtrTester> p3 = p2.Get(); + UNIT_ASSERT(!p3); + } + + UNIT_ASSERT_VALUES_EQUAL(1, destroyCount); + } +} |