diff options
author | don-dron <don-dron@yandex-team.com> | 2023-11-22 17:57:44 +0300 |
---|---|---|
committer | don-dron <don-dron@yandex-team.com> | 2023-11-22 21:08:58 +0300 |
commit | 7f69fb6423bc67daf6e91be48486ba26b6407605 (patch) | |
tree | d91f490681d0f8d0ae574c5525f665ca9042c673 /yt | |
parent | 92257aa9c806d8184613e438720d289d54cf7783 (diff) | |
download | ydb-7f69fb6423bc67daf6e91be48486ba26b6407605.tar.gz |
YT-20423: P2P Snooper cache correcting
Diffstat (limited to 'yt')
-rw-r--r-- | yt/yt/core/misc/async_slru_cache-inl.h | 16 | ||||
-rw-r--r-- | yt/yt/core/misc/async_slru_cache.h | 2 | ||||
-rw-r--r-- | yt/yt/core/misc/sync_cache-inl.h | 66 | ||||
-rw-r--r-- | yt/yt/core/misc/sync_cache.h | 5 | ||||
-rw-r--r-- | yt/yt/core/misc/unittests/sync_cache_ut.cpp | 44 |
5 files changed, 133 insertions, 0 deletions
diff --git a/yt/yt/core/misc/async_slru_cache-inl.h b/yt/yt/core/misc/async_slru_cache-inl.h index 66b36a830c..8d70239ce0 100644 --- a/yt/yt/core/misc/async_slru_cache-inl.h +++ b/yt/yt/core/misc/async_slru_cache-inl.h @@ -877,6 +877,8 @@ void TAsyncSlruCacheBase<TKey, TValue, THash>::UpdateWeight(const TKey& key) NotifyOnTrim(shard->Trim(guard), nullptr); + OnWeightUpdated(weightDelta); + if (GhostCachesEnabled_.load()) { shard->SmallGhost.UpdateWeight(key, newWeight); shard->LargeGhost.UpdateWeight(key, newWeight); @@ -910,6 +912,10 @@ void TAsyncSlruCacheBase<TKey, TValue, THash>::OnRemoved(const TValuePtr& /*valu { } template <class TKey, class TValue, class THash> +void TAsyncSlruCacheBase<TKey, TValue, THash>::OnWeightUpdated(i64 /*weightDelta*/) +{ } + +template <class TKey, class TValue, class THash> bool TAsyncSlruCacheBase<TKey, TValue, THash>::IsResurrectionSupported() const { return true; @@ -1375,6 +1381,16 @@ TMemoryTrackingAsyncSlruCacheBase<TKey, TValue, THash>::~TMemoryTrackingAsyncSlr } template <class TKey, class TValue, class THash> +void TMemoryTrackingAsyncSlruCacheBase<TKey, TValue, THash>::OnWeightUpdated(i64 weightDelta) +{ + if (weightDelta > 0) { + MemoryTracker_->Acquire(weightDelta); + } else { + MemoryTracker_->Release(-weightDelta); + } +} + +template <class TKey, class TValue, class THash> void TMemoryTrackingAsyncSlruCacheBase<TKey, TValue, THash>::OnAdded(const TValuePtr& value) { MemoryTracker_->Acquire(this->GetWeight(value)); diff --git a/yt/yt/core/misc/async_slru_cache.h b/yt/yt/core/misc/async_slru_cache.h index 7ddad541f6..d83b51ccf3 100644 --- a/yt/yt/core/misc/async_slru_cache.h +++ b/yt/yt/core/misc/async_slru_cache.h @@ -223,6 +223,7 @@ protected: virtual void OnAdded(const TValuePtr& value); virtual void OnRemoved(const TValuePtr& value); + virtual void OnWeightUpdated(i64 weightDelta); //! Returns true if resurrection is supported. Note that the function must always returns the same value. virtual bool IsResurrectionSupported() const; @@ -464,6 +465,7 @@ protected: void OnAdded(const TValuePtr& value) override; void OnRemoved(const TValuePtr& value) override; + void OnWeightUpdated(i64 weightChanged) override; private: const IMemoryUsageTrackerPtr MemoryTracker_; diff --git a/yt/yt/core/misc/sync_cache-inl.h b/yt/yt/core/misc/sync_cache-inl.h index ef675192d1..b2f259c5ae 100644 --- a/yt/yt/core/misc/sync_cache-inl.h +++ b/yt/yt/core/misc/sync_cache-inl.h @@ -157,6 +157,57 @@ TSyncSlruCacheBase<TKey, TValue, THash>::GetAll() const } template <class TKey, class TValue, class THash> +void TSyncSlruCacheBase<TKey, TValue, THash>::UpdateWeight(const TValuePtr& value) +{ + UpdateWeight(value->GetKey()); +} + +template <class TKey, class TValue, class THash> +void TSyncSlruCacheBase<TKey, TValue, THash>::UpdateWeight(const TKey& key) +{ + auto* shard = GetShardByKey(key); + + auto guard = WriterGuard(shard->SpinLock); + + DrainTouchBuffer(shard); + + auto itemIt = shard->ItemMap.find(key); + if (itemIt == shard->ItemMap.end()) { + return; + } + + auto item = itemIt->second; + if (!item->Value) { + return; + } + + i64 newWeight = GetWeight(item->Value); + i64 weightDelta = newWeight - item->CachedWeight; + + YT_VERIFY(!item->Empty()); + + if (item->Younger) { + shard->YoungerWeightCounter += weightDelta; + YoungerWeightCounter_.fetch_add(weightDelta, std::memory_order::relaxed); + } else { + shard->OlderWeightCounter += weightDelta; + OlderWeightCounter_.fetch_add(weightDelta, std::memory_order::relaxed); + } + + item->CachedWeight += weightDelta; + + // If item weight increases, it means that some parts of the item were missing in cache, + // so add delta to missed weight. + if (weightDelta > 0) { + MissedWeightCounter_.Increment(weightDelta); + } + + Trim(shard, guard); + + OnWeightUpdated(weightDelta); +} + +template <class TKey, class TValue, class THash> bool TSyncSlruCacheBase<TKey, TValue, THash>::TryInsert(const TValuePtr& value, TValuePtr* existingValue) { const auto& key = value->GetKey(); @@ -345,6 +396,10 @@ void TSyncSlruCacheBase<TKey, TValue, THash>::OnRemoved(const TValuePtr& /*value { } template <class TKey, class TValue, class THash> +void TSyncSlruCacheBase<TKey, TValue, THash>::OnWeightUpdated(i64/*weightDelta*/) +{ } + +template <class TKey, class TValue, class THash> int TSyncSlruCacheBase<TKey, TValue, THash>::GetSize() const { return Size_.load(std::memory_order::relaxed); @@ -356,6 +411,7 @@ void TSyncSlruCacheBase<TKey, TValue, THash>::PushToYounger(TShard* shard, TItem YT_ASSERT(item->Empty()); shard->YoungerLruList.PushFront(item); auto weight = GetWeight(item->Value); + item->CachedWeight = weight; shard->YoungerWeightCounter += weight; YoungerWeightCounter_.fetch_add(weight, std::memory_order::relaxed); item->Younger = true; @@ -432,6 +488,16 @@ TMemoryTrackingSyncSlruCacheBase<TKey, TValue, THash>::~TMemoryTrackingSyncSlruC } template <class TKey, class TValue, class THash> +void TMemoryTrackingSyncSlruCacheBase<TKey, TValue, THash>::OnWeightUpdated(i64 weightDelta) +{ + if (weightDelta > 0) { + MemoryTracker_->Acquire(weightDelta); + } else { + MemoryTracker_->Release(-weightDelta); + } +} + +template <class TKey, class TValue, class THash> void TMemoryTrackingSyncSlruCacheBase<TKey, TValue, THash>::OnAdded(const TValuePtr& value) { MemoryTracker_->Acquire(this->GetWeight(value)); diff --git a/yt/yt/core/misc/sync_cache.h b/yt/yt/core/misc/sync_cache.h index ab02908bc0..11ddddb7dc 100644 --- a/yt/yt/core/misc/sync_cache.h +++ b/yt/yt/core/misc/sync_cache.h @@ -49,6 +49,8 @@ public: bool TryInsert(const TValuePtr& value, TValuePtr* existingValue = nullptr); bool TryRemove(const TKey& key); bool TryRemove(const TValuePtr& value); + void UpdateWeight(const TKey& key); + void UpdateWeight(const TValuePtr& value); void Clear(); virtual void Reconfigure(const TSlruCacheDynamicConfigPtr& config); @@ -66,6 +68,7 @@ protected: virtual i64 GetWeight(const TValuePtr& value) const; virtual void OnAdded(const TValuePtr& value); virtual void OnRemoved(const TValuePtr& value); + virtual void OnWeightUpdated(i64 weightDelta); private: struct TItem @@ -74,6 +77,7 @@ private: explicit TItem(TValuePtr value); TValuePtr Value; + i64 CachedWeight; bool Younger; }; @@ -137,6 +141,7 @@ protected: void OnAdded(const TValuePtr& value) override; void OnRemoved(const TValuePtr& value) override; + void OnWeightUpdated(i64 weightChanged) override; private: const IMemoryUsageTrackerPtr MemoryTracker_; diff --git a/yt/yt/core/misc/unittests/sync_cache_ut.cpp b/yt/yt/core/misc/unittests/sync_cache_ut.cpp index ea3b68604d..5ddbf6de9f 100644 --- a/yt/yt/core/misc/unittests/sync_cache_ut.cpp +++ b/yt/yt/core/misc/unittests/sync_cache_ut.cpp @@ -2,6 +2,8 @@ #include <yt/yt/core/misc/sync_cache.h> +#include <util/random/fast.h> + namespace NYT::NConcurrency { namespace { @@ -32,6 +34,20 @@ public: } }; +TSharedRef CreateRandomReference(TFastRng64& rnd, i64 size) +{ + TString s; + s.resize(size, '*'); + + for (i64 index = 0; index < size; ++index) { + s[index] = (char)rnd.GenRand64(); + } + + auto output = TSharedRef::FromString(s); + YT_ASSERT(static_cast<i64>(output.Size()) == size); + return output; +} + TEST(TSyncSlruCache, DownsizeSegfault) { auto config = New<TSlruCacheConfig>(); @@ -55,6 +71,34 @@ TEST(TSyncSlruCache, DownsizeSegfault) } } +TEST(TSyncSlruCache, EntryWeightUpdate) +{ + TFastRng64 rng(27); + auto config = New<TSlruCacheConfig>(); + config->Capacity = 1000; + + auto cache = New<TTestCache>(config); + for (; cache->GetSize() < 990;) { + cache->TryInsert(New<TTestValue>(TString(CreateRandomReference(rng, 256).ToStringBuf()))); + } + + EXPECT_GE(990, cache->GetSize()); + + for (auto& value : cache->GetAll()) { + value->Weight *= 2; + cache->UpdateWeight(value); + } + + EXPECT_GE(500, cache->GetSize()); + + for (auto& value : cache->GetAll()) { + value->Weight *= 2; + cache->UpdateWeight(value); + } + + EXPECT_GE(250, cache->GetSize()); +} + //////////////////////////////////////////////////////////////////////////////// } // namespace |