aboutsummaryrefslogtreecommitdiffstats
path: root/yt
diff options
context:
space:
mode:
authordon-dron <don-dron@yandex-team.com>2023-11-22 17:57:44 +0300
committerdon-dron <don-dron@yandex-team.com>2023-11-22 21:08:58 +0300
commit7f69fb6423bc67daf6e91be48486ba26b6407605 (patch)
treed91f490681d0f8d0ae574c5525f665ca9042c673 /yt
parent92257aa9c806d8184613e438720d289d54cf7783 (diff)
downloadydb-7f69fb6423bc67daf6e91be48486ba26b6407605.tar.gz
YT-20423: P2P Snooper cache correcting
Diffstat (limited to 'yt')
-rw-r--r--yt/yt/core/misc/async_slru_cache-inl.h16
-rw-r--r--yt/yt/core/misc/async_slru_cache.h2
-rw-r--r--yt/yt/core/misc/sync_cache-inl.h66
-rw-r--r--yt/yt/core/misc/sync_cache.h5
-rw-r--r--yt/yt/core/misc/unittests/sync_cache_ut.cpp44
5 files changed, 133 insertions, 0 deletions
diff --git a/yt/yt/core/misc/async_slru_cache-inl.h b/yt/yt/core/misc/async_slru_cache-inl.h
index 66b36a830c..8d70239ce0 100644
--- a/yt/yt/core/misc/async_slru_cache-inl.h
+++ b/yt/yt/core/misc/async_slru_cache-inl.h
@@ -877,6 +877,8 @@ void TAsyncSlruCacheBase<TKey, TValue, THash>::UpdateWeight(const TKey& key)
NotifyOnTrim(shard->Trim(guard), nullptr);
+ OnWeightUpdated(weightDelta);
+
if (GhostCachesEnabled_.load()) {
shard->SmallGhost.UpdateWeight(key, newWeight);
shard->LargeGhost.UpdateWeight(key, newWeight);
@@ -910,6 +912,10 @@ void TAsyncSlruCacheBase<TKey, TValue, THash>::OnRemoved(const TValuePtr& /*valu
{ }
template <class TKey, class TValue, class THash>
+void TAsyncSlruCacheBase<TKey, TValue, THash>::OnWeightUpdated(i64 /*weightDelta*/)
+{ }
+
+template <class TKey, class TValue, class THash>
bool TAsyncSlruCacheBase<TKey, TValue, THash>::IsResurrectionSupported() const
{
return true;
@@ -1375,6 +1381,16 @@ TMemoryTrackingAsyncSlruCacheBase<TKey, TValue, THash>::~TMemoryTrackingAsyncSlr
}
template <class TKey, class TValue, class THash>
+void TMemoryTrackingAsyncSlruCacheBase<TKey, TValue, THash>::OnWeightUpdated(i64 weightDelta)
+{
+ if (weightDelta > 0) {
+ MemoryTracker_->Acquire(weightDelta);
+ } else {
+ MemoryTracker_->Release(-weightDelta);
+ }
+}
+
+template <class TKey, class TValue, class THash>
void TMemoryTrackingAsyncSlruCacheBase<TKey, TValue, THash>::OnAdded(const TValuePtr& value)
{
MemoryTracker_->Acquire(this->GetWeight(value));
diff --git a/yt/yt/core/misc/async_slru_cache.h b/yt/yt/core/misc/async_slru_cache.h
index 7ddad541f6..d83b51ccf3 100644
--- a/yt/yt/core/misc/async_slru_cache.h
+++ b/yt/yt/core/misc/async_slru_cache.h
@@ -223,6 +223,7 @@ protected:
virtual void OnAdded(const TValuePtr& value);
virtual void OnRemoved(const TValuePtr& value);
+ virtual void OnWeightUpdated(i64 weightDelta);
//! Returns true if resurrection is supported. Note that the function must always returns the same value.
virtual bool IsResurrectionSupported() const;
@@ -464,6 +465,7 @@ protected:
void OnAdded(const TValuePtr& value) override;
void OnRemoved(const TValuePtr& value) override;
+ void OnWeightUpdated(i64 weightChanged) override;
private:
const IMemoryUsageTrackerPtr MemoryTracker_;
diff --git a/yt/yt/core/misc/sync_cache-inl.h b/yt/yt/core/misc/sync_cache-inl.h
index ef675192d1..b2f259c5ae 100644
--- a/yt/yt/core/misc/sync_cache-inl.h
+++ b/yt/yt/core/misc/sync_cache-inl.h
@@ -157,6 +157,57 @@ TSyncSlruCacheBase<TKey, TValue, THash>::GetAll() const
}
template <class TKey, class TValue, class THash>
+void TSyncSlruCacheBase<TKey, TValue, THash>::UpdateWeight(const TValuePtr& value)
+{
+ UpdateWeight(value->GetKey());
+}
+
+template <class TKey, class TValue, class THash>
+void TSyncSlruCacheBase<TKey, TValue, THash>::UpdateWeight(const TKey& key)
+{
+ auto* shard = GetShardByKey(key);
+
+ auto guard = WriterGuard(shard->SpinLock);
+
+ DrainTouchBuffer(shard);
+
+ auto itemIt = shard->ItemMap.find(key);
+ if (itemIt == shard->ItemMap.end()) {
+ return;
+ }
+
+ auto item = itemIt->second;
+ if (!item->Value) {
+ return;
+ }
+
+ i64 newWeight = GetWeight(item->Value);
+ i64 weightDelta = newWeight - item->CachedWeight;
+
+ YT_VERIFY(!item->Empty());
+
+ if (item->Younger) {
+ shard->YoungerWeightCounter += weightDelta;
+ YoungerWeightCounter_.fetch_add(weightDelta, std::memory_order::relaxed);
+ } else {
+ shard->OlderWeightCounter += weightDelta;
+ OlderWeightCounter_.fetch_add(weightDelta, std::memory_order::relaxed);
+ }
+
+ item->CachedWeight += weightDelta;
+
+ // If item weight increases, it means that some parts of the item were missing in cache,
+ // so add delta to missed weight.
+ if (weightDelta > 0) {
+ MissedWeightCounter_.Increment(weightDelta);
+ }
+
+ Trim(shard, guard);
+
+ OnWeightUpdated(weightDelta);
+}
+
+template <class TKey, class TValue, class THash>
bool TSyncSlruCacheBase<TKey, TValue, THash>::TryInsert(const TValuePtr& value, TValuePtr* existingValue)
{
const auto& key = value->GetKey();
@@ -345,6 +396,10 @@ void TSyncSlruCacheBase<TKey, TValue, THash>::OnRemoved(const TValuePtr& /*value
{ }
template <class TKey, class TValue, class THash>
+void TSyncSlruCacheBase<TKey, TValue, THash>::OnWeightUpdated(i64/*weightDelta*/)
+{ }
+
+template <class TKey, class TValue, class THash>
int TSyncSlruCacheBase<TKey, TValue, THash>::GetSize() const
{
return Size_.load(std::memory_order::relaxed);
@@ -356,6 +411,7 @@ void TSyncSlruCacheBase<TKey, TValue, THash>::PushToYounger(TShard* shard, TItem
YT_ASSERT(item->Empty());
shard->YoungerLruList.PushFront(item);
auto weight = GetWeight(item->Value);
+ item->CachedWeight = weight;
shard->YoungerWeightCounter += weight;
YoungerWeightCounter_.fetch_add(weight, std::memory_order::relaxed);
item->Younger = true;
@@ -432,6 +488,16 @@ TMemoryTrackingSyncSlruCacheBase<TKey, TValue, THash>::~TMemoryTrackingSyncSlruC
}
template <class TKey, class TValue, class THash>
+void TMemoryTrackingSyncSlruCacheBase<TKey, TValue, THash>::OnWeightUpdated(i64 weightDelta)
+{
+ if (weightDelta > 0) {
+ MemoryTracker_->Acquire(weightDelta);
+ } else {
+ MemoryTracker_->Release(-weightDelta);
+ }
+}
+
+template <class TKey, class TValue, class THash>
void TMemoryTrackingSyncSlruCacheBase<TKey, TValue, THash>::OnAdded(const TValuePtr& value)
{
MemoryTracker_->Acquire(this->GetWeight(value));
diff --git a/yt/yt/core/misc/sync_cache.h b/yt/yt/core/misc/sync_cache.h
index ab02908bc0..11ddddb7dc 100644
--- a/yt/yt/core/misc/sync_cache.h
+++ b/yt/yt/core/misc/sync_cache.h
@@ -49,6 +49,8 @@ public:
bool TryInsert(const TValuePtr& value, TValuePtr* existingValue = nullptr);
bool TryRemove(const TKey& key);
bool TryRemove(const TValuePtr& value);
+ void UpdateWeight(const TKey& key);
+ void UpdateWeight(const TValuePtr& value);
void Clear();
virtual void Reconfigure(const TSlruCacheDynamicConfigPtr& config);
@@ -66,6 +68,7 @@ protected:
virtual i64 GetWeight(const TValuePtr& value) const;
virtual void OnAdded(const TValuePtr& value);
virtual void OnRemoved(const TValuePtr& value);
+ virtual void OnWeightUpdated(i64 weightDelta);
private:
struct TItem
@@ -74,6 +77,7 @@ private:
explicit TItem(TValuePtr value);
TValuePtr Value;
+ i64 CachedWeight;
bool Younger;
};
@@ -137,6 +141,7 @@ protected:
void OnAdded(const TValuePtr& value) override;
void OnRemoved(const TValuePtr& value) override;
+ void OnWeightUpdated(i64 weightChanged) override;
private:
const IMemoryUsageTrackerPtr MemoryTracker_;
diff --git a/yt/yt/core/misc/unittests/sync_cache_ut.cpp b/yt/yt/core/misc/unittests/sync_cache_ut.cpp
index ea3b68604d..5ddbf6de9f 100644
--- a/yt/yt/core/misc/unittests/sync_cache_ut.cpp
+++ b/yt/yt/core/misc/unittests/sync_cache_ut.cpp
@@ -2,6 +2,8 @@
#include <yt/yt/core/misc/sync_cache.h>
+#include <util/random/fast.h>
+
namespace NYT::NConcurrency {
namespace {
@@ -32,6 +34,20 @@ public:
}
};
+TSharedRef CreateRandomReference(TFastRng64& rnd, i64 size)
+{
+ TString s;
+ s.resize(size, '*');
+
+ for (i64 index = 0; index < size; ++index) {
+ s[index] = (char)rnd.GenRand64();
+ }
+
+ auto output = TSharedRef::FromString(s);
+ YT_ASSERT(static_cast<i64>(output.Size()) == size);
+ return output;
+}
+
TEST(TSyncSlruCache, DownsizeSegfault)
{
auto config = New<TSlruCacheConfig>();
@@ -55,6 +71,34 @@ TEST(TSyncSlruCache, DownsizeSegfault)
}
}
+TEST(TSyncSlruCache, EntryWeightUpdate)
+{
+ TFastRng64 rng(27);
+ auto config = New<TSlruCacheConfig>();
+ config->Capacity = 1000;
+
+ auto cache = New<TTestCache>(config);
+ for (; cache->GetSize() < 990;) {
+ cache->TryInsert(New<TTestValue>(TString(CreateRandomReference(rng, 256).ToStringBuf())));
+ }
+
+ EXPECT_GE(990, cache->GetSize());
+
+ for (auto& value : cache->GetAll()) {
+ value->Weight *= 2;
+ cache->UpdateWeight(value);
+ }
+
+ EXPECT_GE(500, cache->GetSize());
+
+ for (auto& value : cache->GetAll()) {
+ value->Weight *= 2;
+ cache->UpdateWeight(value);
+ }
+
+ EXPECT_GE(250, cache->GetSize());
+}
+
////////////////////////////////////////////////////////////////////////////////
} // namespace