diff options
author | robot-piglet <robot-piglet@yandex-team.com> | 2024-11-08 22:23:35 +0300 |
---|---|---|
committer | robot-piglet <robot-piglet@yandex-team.com> | 2024-11-08 22:32:25 +0300 |
commit | 1885c7f026a96d8fd5c7eced266e381c1d8bdfee (patch) | |
tree | e339902c9cf12f27410bc85f3823a77e3d677b79 /yt | |
parent | 270fb9d74411407b11560a659568c32076af41c1 (diff) | |
download | ydb-1885c7f026a96d8fd5c7eced266e381c1d8bdfee.tar.gz |
Intermediate changes
commit_hash:b2b69ddfc6b8407ae1b67895aee91f96e439d07d
Diffstat (limited to 'yt')
-rw-r--r-- | yt/yt/core/misc/async_slru_cache-inl.h | 61 | ||||
-rw-r--r-- | yt/yt/core/misc/async_slru_cache.h | 10 | ||||
-rw-r--r-- | yt/yt/core/misc/unittests/async_slru_cache_ut.cpp | 99 |
3 files changed, 132 insertions, 38 deletions
diff --git a/yt/yt/core/misc/async_slru_cache-inl.h b/yt/yt/core/misc/async_slru_cache-inl.h index ed28ce6942..d1ce44a3a6 100644 --- a/yt/yt/core/misc/async_slru_cache-inl.h +++ b/yt/yt/core/misc/async_slru_cache-inl.h @@ -341,7 +341,7 @@ void TAsyncSlruCacheBase<TKey, TValue, THash>::Reconfigure(const TSlruCacheDynam auto writerGuard = WriterGuard(shard.SpinLock); shard.Reconfigure(shardCapacity, youngerSizeFraction); shard.DrainTouchBuffer(); - NotifyOnTrim(shard.Trim(writerGuard), nullptr); + TrimWithNotify(&shard, writerGuard, nullptr); } } @@ -544,7 +544,7 @@ TAsyncSlruCacheBase<TKey, TValue, THash>::DoLookup(TShard* shard, const TKey& ke Counters_.SyncHitCounter.Increment(); // NB: Releases the lock. - NotifyOnTrim(shard->Trim(writerGuard), value); + TrimWithNotify(shard, writerGuard, value); if (GhostCachesEnabled_.load()) { shard->SmallGhost.Resurrect(value, weight); @@ -649,7 +649,7 @@ auto TAsyncSlruCacheBase<TKey, TValue, THash>::BeginInsert(const TKey& key, i64 shard->UpdateCookie(item, /*countDelta*/ 1, cookieWeight); if (cookieWeight > 0) { // NB: Releases the lock. - NotifyOnTrim(shard->Trim(guard), nullptr, cookieWeight); + TrimWithNotify(shard, guard, nullptr, cookieWeight); } guard.Release(); @@ -681,7 +681,7 @@ auto TAsyncSlruCacheBase<TKey, TValue, THash>::BeginInsert(const TKey& key, i64 Counters_.SyncHitCounter.Increment(); // NB: Releases the lock. - NotifyOnTrim(shard->Trim(guard), value); + TrimWithNotify(shard, guard, value); guard.Release(); @@ -729,7 +729,7 @@ void TAsyncSlruCacheBase<TKey, TValue, THash>::UpdateCookieWeight(const TInsertC shard->DrainTouchBuffer(); // NB: Releases the lock. - NotifyOnTrim(shard->Trim(guard), nullptr, weightDelta); + TrimWithNotify(shard, guard, nullptr, weightDelta); } else { guard.Release(); OnWeightUpdated(weightDelta); @@ -774,7 +774,7 @@ void TAsyncSlruCacheBase<TKey, TValue, THash>::EndInsert(const TInsertCookie& in Counters_.AsyncHitWeightCounter.Increment(weight * item->AsyncHitCount.load()); // NB: Releases the lock. - NotifyOnTrim(shard->Trim(guard), value, -cookieWeight); + TrimWithNotify(shard, guard, value, -cookieWeight); // We do not want to break the ghost cache invariants, according to which either EndInsert // or CancelInsert must be called for each item in Inserting state. So we end the insertion @@ -950,7 +950,7 @@ void TAsyncSlruCacheBase<TKey, TValue, THash>::UpdateWeight(const TKey& key) Counters_.MissedWeightCounter.Increment(weightDelta); } - NotifyOnTrim(shard->Trim(guard), nullptr, weightDelta); + TrimWithNotify(shard, guard, nullptr, weightDelta); if (GhostCachesEnabled_.load()) { shard->SmallGhost.UpdateWeight(key, newWeight); @@ -1315,10 +1315,8 @@ void TAsyncSlruCacheBase<TKey, TValue, THash>::TGhostShard::Trim(NThreading::TWr template <class TKey, class TValue, class THash> std::vector<typename TAsyncSlruCacheBase<TKey, TValue, THash>::TValuePtr> -TAsyncSlruCacheBase<TKey, TValue, THash>::TShard::Trim(NThreading::TWriterGuard<NThreading::TReaderWriterSpinLock>& guard) +TAsyncSlruCacheBase<TKey, TValue, THash>::TShard::Trim(const TIntrusiveListWithAutoDelete<TItem, TDelete>& evictedItems) { - auto evictedItems = this->TrimNoDelete(); - Parent->Size_ -= static_cast<int>(evictedItems.Size()); std::vector<TValuePtr> evictedValues; @@ -1338,6 +1336,32 @@ TAsyncSlruCacheBase<TKey, TValue, THash>::TShard::Trim(NThreading::TWriterGuard< evictedValues.push_back(std::move(value)); } + return evictedValues; +} + +template <class TKey, class TValue, class THash> +std::vector<typename TAsyncSlruCacheBase<TKey, TValue, THash>::TValuePtr> +TAsyncSlruCacheBase<TKey, TValue, THash>::TrimWithNotify( + TShard* shard, + NThreading::TWriterGuard<NThreading::TReaderWriterSpinLock>& guard, + const TValuePtr& insertedValue, + i64 weightDelta) +{ + VERIFY_SPINLOCK_AFFINITY(shard->SpinLock); + + auto evictedItems = shard->TrimNoDelete(); + auto evictedValues = shard->Trim(evictedItems); + + if (weightDelta != 0) { + OnWeightUpdated(weightDelta); + } + if (insertedValue) { + OnAdded(insertedValue); + } + for (const auto& value : evictedValues) { + OnRemoved(value); + } + // NB. Evicted items must die outside of critical section. guard.Release(); @@ -1365,23 +1389,6 @@ void TAsyncSlruCacheBase<TKey, TValue, THash>::TShard::OnCookieUpdated(i64 delta Parent->CookieWeightCounter_ += deltaWeight; } -template <class TKey, class TValue, class THash> -void TAsyncSlruCacheBase<TKey, TValue, THash>::NotifyOnTrim( - const std::vector<TValuePtr>& evictedValues, - const TValuePtr& insertedValue, - i64 weightDelta) -{ - if (weightDelta != 0) { - OnWeightUpdated(weightDelta); - } - if (insertedValue) { - OnAdded(insertedValue); - } - for (const auto& value : evictedValues) { - OnRemoved(value); - } -} - //////////////////////////////////////////////////////////////////////////////// template <class TKey, class TValue, class THash> diff --git a/yt/yt/core/misc/async_slru_cache.h b/yt/yt/core/misc/async_slru_cache.h index b6ce9f3b98..e4395185bb 100644 --- a/yt/yt/core/misc/async_slru_cache.h +++ b/yt/yt/core/misc/async_slru_cache.h @@ -410,8 +410,8 @@ private: TGhostShard SmallGhost; TGhostShard LargeGhost; - //! Trims the lists and releases the guard. Returns the list of evicted items. - std::vector<TValuePtr> Trim(NThreading::TWriterGuard<NThreading::TReaderWriterSpinLock>& guard); + //! Returns the list of evicted items. + std::vector<TValuePtr> Trim(const TIntrusiveListWithAutoDelete<TItem, TDelete>& evictedItems); protected: void OnYoungerUpdated(i64 deltaCount, i64 deltaWeight); @@ -451,7 +451,11 @@ private: //! Calls OnAdded on OnRemoved for the values evicted with Trim(). If the trim was caused by insertion, then //! insertedValue must be the value, insertion of which caused trim. Otherwise, insertedValue must be nullptr. //! If the trim was causes by weight update or weighted cookie, then weightDelta represents weight changes. - void NotifyOnTrim(const std::vector<TValuePtr>& evictedValues, const TValuePtr& insertedValue, i64 weightDelta = 0); + std::vector<TValuePtr> TrimWithNotify( + TShard* shard, + NThreading::TWriterGuard<NThreading::TReaderWriterSpinLock>& guard, + const TValuePtr& insertedValue, + i64 weightDelta = 0); void UpdateCookieWeight(const TInsertCookie& insertCookie, i64 newWeight); void EndInsert(const TInsertCookie& insertCookie, TValuePtr value); diff --git a/yt/yt/core/misc/unittests/async_slru_cache_ut.cpp b/yt/yt/core/misc/unittests/async_slru_cache_ut.cpp index 64c166be19..b2d72822db 100644 --- a/yt/yt/core/misc/unittests/async_slru_cache_ut.cpp +++ b/yt/yt/core/misc/unittests/async_slru_cache_ut.cpp @@ -1,5 +1,9 @@ #include <yt/yt/core/test_framework/framework.h> +#include <yt/yt/core/concurrency/public.h> +#include <yt/yt/core/concurrency/fair_share_action_queue.h> +#include <yt/yt/core/concurrency/thread_pool.h> + #include <yt/yt/core/misc/async_slru_cache.h> #include <yt/yt/core/misc/property.h> @@ -14,6 +18,10 @@ using namespace NProfiling; //////////////////////////////////////////////////////////////////////////////// +const NLogging::TLogger Logger("Main"); + +//////////////////////////////////////////////////////////////////////////////// + DECLARE_REFCOUNTED_CLASS(TSimpleCachedValue) class TSimpleCachedValue @@ -109,9 +117,23 @@ public: : TAsyncSlruCacheBase(std::move(config)), EnableResurrection_(enableResurrection) { } - DEFINE_BYVAL_RO_PROPERTY(int, ItemCount, 0); - DEFINE_BYVAL_RO_PROPERTY(int, TotalAdded, 0); - DEFINE_BYVAL_RO_PROPERTY(int, TotalRemoved, 0); + int GetItemCount() const + { + auto guard = Guard(Lock_); + return Keys_.size(); + } + + int GetTotalAdded() const + { + auto guard = Guard(Lock_); + return TotalAdded_; + } + + int GetTotalRemoved() const + { + auto guard = Guard(Lock_); + return TotalRemoved_; + } protected: i64 GetWeight(const TSimpleCachedValuePtr& value) const override @@ -119,24 +141,42 @@ protected: return value->Weight; } - void OnAdded(const TSimpleCachedValuePtr& /*value*/) override + void OnAdded(const TSimpleCachedValuePtr& value) override { - ++ItemCount_; + YT_LOG_DEBUG("Item add (Item: %v)", value->GetKey()); + auto guard = Guard(Lock_); + + if (!Keys_.find(value->GetKey()).IsEnd()) { + YT_LOG_ALERT("Item already exist (Item: %v)", value->GetKey()); + } + + EmplaceOrCrash(Keys_, value->GetKey()); ++TotalAdded_; } - void OnRemoved(const TSimpleCachedValuePtr& /*value*/) override + void OnRemoved(const TSimpleCachedValuePtr& value) override { - --ItemCount_; + YT_LOG_DEBUG("Item remove (Item: %v)", value->GetKey()); + auto guard = Guard(Lock_); + + if (Keys_.find(value->GetKey()).IsEnd()) { + YT_LOG_ALERT("Item not found (Item: %v)", value->GetKey()); + } + + EraseOrCrash(Keys_, value->GetKey()); ++TotalRemoved_; - EXPECT_GE(ItemCount_, 0); } + bool IsResurrectionSupported() const override { return EnableResurrection_; } private: + YT_DECLARE_SPIN_LOCK(NThreading::TSpinLock, Lock_); + THashSet<int> Keys_; + int TotalAdded_ = 0; + int TotalRemoved_ = 0; bool EnableResurrection_; }; @@ -464,6 +504,49 @@ TEST(TAsyncSlruCacheTest, AddRemoveWithResurrection) } } +TEST(TAsyncSlruCacheTest, AddRemoveStressTest) +{ + auto threadPool = NConcurrency::CreateThreadPool(2, "AddRemoveStressTest"); + + constexpr int cacheSize = 5; + constexpr int valueCount = 20; + auto config = CreateCacheConfig(cacheSize); + auto cache = New<TCountingSlruCache>(std::move(config)); + + std::vector<TSimpleCachedValuePtr> values; + + for (int i = 0; i < valueCount; ++i) { + values.push_back(New<TSimpleCachedValue>(i, i)); + } + + auto callback = BIND([&] { + std::vector<TCountingSlruCache::TInsertCookie> cookies; + + for (int i = 0; i < valueCount; ++i) { + auto cookie = cache->BeginInsert(i); + cookies.emplace_back(std::move(cookie)); + } + + for (int i = 0; i < valueCount; ++i) { + cookies.back().EndInsert(values[i]); + cookies.pop_back(); + } + }); + + for (int i = 0; i < 100; i++) { + std::vector<TFuture<void>> futures; + futures.reserve(2); + + for (int j = 0; j < 2; j++) { + futures.push_back(callback.AsyncVia(threadPool->GetInvoker()).Run()); + } + + NConcurrency::WaitFor(AllSucceeded(futures)).ThrowOnError(); + } + + threadPool->Shutdown(); +} + TEST(TAsyncSlruCacheTest, AddThenImmediatelyRemove) { constexpr int cacheSize = 1; |