aboutsummaryrefslogtreecommitdiffstats
path: root/yt
diff options
context:
space:
mode:
authorrobot-piglet <robot-piglet@yandex-team.com>2024-11-08 22:23:35 +0300
committerrobot-piglet <robot-piglet@yandex-team.com>2024-11-08 22:32:25 +0300
commit1885c7f026a96d8fd5c7eced266e381c1d8bdfee (patch)
treee339902c9cf12f27410bc85f3823a77e3d677b79 /yt
parent270fb9d74411407b11560a659568c32076af41c1 (diff)
downloadydb-1885c7f026a96d8fd5c7eced266e381c1d8bdfee.tar.gz
Intermediate changes
commit_hash:b2b69ddfc6b8407ae1b67895aee91f96e439d07d
Diffstat (limited to 'yt')
-rw-r--r--yt/yt/core/misc/async_slru_cache-inl.h61
-rw-r--r--yt/yt/core/misc/async_slru_cache.h10
-rw-r--r--yt/yt/core/misc/unittests/async_slru_cache_ut.cpp99
3 files changed, 132 insertions, 38 deletions
diff --git a/yt/yt/core/misc/async_slru_cache-inl.h b/yt/yt/core/misc/async_slru_cache-inl.h
index ed28ce6942..d1ce44a3a6 100644
--- a/yt/yt/core/misc/async_slru_cache-inl.h
+++ b/yt/yt/core/misc/async_slru_cache-inl.h
@@ -341,7 +341,7 @@ void TAsyncSlruCacheBase<TKey, TValue, THash>::Reconfigure(const TSlruCacheDynam
auto writerGuard = WriterGuard(shard.SpinLock);
shard.Reconfigure(shardCapacity, youngerSizeFraction);
shard.DrainTouchBuffer();
- NotifyOnTrim(shard.Trim(writerGuard), nullptr);
+ TrimWithNotify(&shard, writerGuard, nullptr);
}
}
@@ -544,7 +544,7 @@ TAsyncSlruCacheBase<TKey, TValue, THash>::DoLookup(TShard* shard, const TKey& ke
Counters_.SyncHitCounter.Increment();
// NB: Releases the lock.
- NotifyOnTrim(shard->Trim(writerGuard), value);
+ TrimWithNotify(shard, writerGuard, value);
if (GhostCachesEnabled_.load()) {
shard->SmallGhost.Resurrect(value, weight);
@@ -649,7 +649,7 @@ auto TAsyncSlruCacheBase<TKey, TValue, THash>::BeginInsert(const TKey& key, i64
shard->UpdateCookie(item, /*countDelta*/ 1, cookieWeight);
if (cookieWeight > 0) {
// NB: Releases the lock.
- NotifyOnTrim(shard->Trim(guard), nullptr, cookieWeight);
+ TrimWithNotify(shard, guard, nullptr, cookieWeight);
}
guard.Release();
@@ -681,7 +681,7 @@ auto TAsyncSlruCacheBase<TKey, TValue, THash>::BeginInsert(const TKey& key, i64
Counters_.SyncHitCounter.Increment();
// NB: Releases the lock.
- NotifyOnTrim(shard->Trim(guard), value);
+ TrimWithNotify(shard, guard, value);
guard.Release();
@@ -729,7 +729,7 @@ void TAsyncSlruCacheBase<TKey, TValue, THash>::UpdateCookieWeight(const TInsertC
shard->DrainTouchBuffer();
// NB: Releases the lock.
- NotifyOnTrim(shard->Trim(guard), nullptr, weightDelta);
+ TrimWithNotify(shard, guard, nullptr, weightDelta);
} else {
guard.Release();
OnWeightUpdated(weightDelta);
@@ -774,7 +774,7 @@ void TAsyncSlruCacheBase<TKey, TValue, THash>::EndInsert(const TInsertCookie& in
Counters_.AsyncHitWeightCounter.Increment(weight * item->AsyncHitCount.load());
// NB: Releases the lock.
- NotifyOnTrim(shard->Trim(guard), value, -cookieWeight);
+ TrimWithNotify(shard, guard, value, -cookieWeight);
// We do not want to break the ghost cache invariants, according to which either EndInsert
// or CancelInsert must be called for each item in Inserting state. So we end the insertion
@@ -950,7 +950,7 @@ void TAsyncSlruCacheBase<TKey, TValue, THash>::UpdateWeight(const TKey& key)
Counters_.MissedWeightCounter.Increment(weightDelta);
}
- NotifyOnTrim(shard->Trim(guard), nullptr, weightDelta);
+ TrimWithNotify(shard, guard, nullptr, weightDelta);
if (GhostCachesEnabled_.load()) {
shard->SmallGhost.UpdateWeight(key, newWeight);
@@ -1315,10 +1315,8 @@ void TAsyncSlruCacheBase<TKey, TValue, THash>::TGhostShard::Trim(NThreading::TWr
template <class TKey, class TValue, class THash>
std::vector<typename TAsyncSlruCacheBase<TKey, TValue, THash>::TValuePtr>
-TAsyncSlruCacheBase<TKey, TValue, THash>::TShard::Trim(NThreading::TWriterGuard<NThreading::TReaderWriterSpinLock>& guard)
+TAsyncSlruCacheBase<TKey, TValue, THash>::TShard::Trim(const TIntrusiveListWithAutoDelete<TItem, TDelete>& evictedItems)
{
- auto evictedItems = this->TrimNoDelete();
-
Parent->Size_ -= static_cast<int>(evictedItems.Size());
std::vector<TValuePtr> evictedValues;
@@ -1338,6 +1336,32 @@ TAsyncSlruCacheBase<TKey, TValue, THash>::TShard::Trim(NThreading::TWriterGuard<
evictedValues.push_back(std::move(value));
}
+ return evictedValues;
+}
+
+template <class TKey, class TValue, class THash>
+std::vector<typename TAsyncSlruCacheBase<TKey, TValue, THash>::TValuePtr>
+TAsyncSlruCacheBase<TKey, TValue, THash>::TrimWithNotify(
+ TShard* shard,
+ NThreading::TWriterGuard<NThreading::TReaderWriterSpinLock>& guard,
+ const TValuePtr& insertedValue,
+ i64 weightDelta)
+{
+ VERIFY_SPINLOCK_AFFINITY(shard->SpinLock);
+
+ auto evictedItems = shard->TrimNoDelete();
+ auto evictedValues = shard->Trim(evictedItems);
+
+ if (weightDelta != 0) {
+ OnWeightUpdated(weightDelta);
+ }
+ if (insertedValue) {
+ OnAdded(insertedValue);
+ }
+ for (const auto& value : evictedValues) {
+ OnRemoved(value);
+ }
+
// NB. Evicted items must die outside of critical section.
guard.Release();
@@ -1365,23 +1389,6 @@ void TAsyncSlruCacheBase<TKey, TValue, THash>::TShard::OnCookieUpdated(i64 delta
Parent->CookieWeightCounter_ += deltaWeight;
}
-template <class TKey, class TValue, class THash>
-void TAsyncSlruCacheBase<TKey, TValue, THash>::NotifyOnTrim(
- const std::vector<TValuePtr>& evictedValues,
- const TValuePtr& insertedValue,
- i64 weightDelta)
-{
- if (weightDelta != 0) {
- OnWeightUpdated(weightDelta);
- }
- if (insertedValue) {
- OnAdded(insertedValue);
- }
- for (const auto& value : evictedValues) {
- OnRemoved(value);
- }
-}
-
////////////////////////////////////////////////////////////////////////////////
template <class TKey, class TValue, class THash>
diff --git a/yt/yt/core/misc/async_slru_cache.h b/yt/yt/core/misc/async_slru_cache.h
index b6ce9f3b98..e4395185bb 100644
--- a/yt/yt/core/misc/async_slru_cache.h
+++ b/yt/yt/core/misc/async_slru_cache.h
@@ -410,8 +410,8 @@ private:
TGhostShard SmallGhost;
TGhostShard LargeGhost;
- //! Trims the lists and releases the guard. Returns the list of evicted items.
- std::vector<TValuePtr> Trim(NThreading::TWriterGuard<NThreading::TReaderWriterSpinLock>& guard);
+ //! Returns the list of evicted items.
+ std::vector<TValuePtr> Trim(const TIntrusiveListWithAutoDelete<TItem, TDelete>& evictedItems);
protected:
void OnYoungerUpdated(i64 deltaCount, i64 deltaWeight);
@@ -451,7 +451,11 @@ private:
//! Calls OnAdded on OnRemoved for the values evicted with Trim(). If the trim was caused by insertion, then
//! insertedValue must be the value, insertion of which caused trim. Otherwise, insertedValue must be nullptr.
//! If the trim was causes by weight update or weighted cookie, then weightDelta represents weight changes.
- void NotifyOnTrim(const std::vector<TValuePtr>& evictedValues, const TValuePtr& insertedValue, i64 weightDelta = 0);
+ std::vector<TValuePtr> TrimWithNotify(
+ TShard* shard,
+ NThreading::TWriterGuard<NThreading::TReaderWriterSpinLock>& guard,
+ const TValuePtr& insertedValue,
+ i64 weightDelta = 0);
void UpdateCookieWeight(const TInsertCookie& insertCookie, i64 newWeight);
void EndInsert(const TInsertCookie& insertCookie, TValuePtr value);
diff --git a/yt/yt/core/misc/unittests/async_slru_cache_ut.cpp b/yt/yt/core/misc/unittests/async_slru_cache_ut.cpp
index 64c166be19..b2d72822db 100644
--- a/yt/yt/core/misc/unittests/async_slru_cache_ut.cpp
+++ b/yt/yt/core/misc/unittests/async_slru_cache_ut.cpp
@@ -1,5 +1,9 @@
#include <yt/yt/core/test_framework/framework.h>
+#include <yt/yt/core/concurrency/public.h>
+#include <yt/yt/core/concurrency/fair_share_action_queue.h>
+#include <yt/yt/core/concurrency/thread_pool.h>
+
#include <yt/yt/core/misc/async_slru_cache.h>
#include <yt/yt/core/misc/property.h>
@@ -14,6 +18,10 @@ using namespace NProfiling;
////////////////////////////////////////////////////////////////////////////////
+const NLogging::TLogger Logger("Main");
+
+////////////////////////////////////////////////////////////////////////////////
+
DECLARE_REFCOUNTED_CLASS(TSimpleCachedValue)
class TSimpleCachedValue
@@ -109,9 +117,23 @@ public:
: TAsyncSlruCacheBase(std::move(config)), EnableResurrection_(enableResurrection)
{ }
- DEFINE_BYVAL_RO_PROPERTY(int, ItemCount, 0);
- DEFINE_BYVAL_RO_PROPERTY(int, TotalAdded, 0);
- DEFINE_BYVAL_RO_PROPERTY(int, TotalRemoved, 0);
+ int GetItemCount() const
+ {
+ auto guard = Guard(Lock_);
+ return Keys_.size();
+ }
+
+ int GetTotalAdded() const
+ {
+ auto guard = Guard(Lock_);
+ return TotalAdded_;
+ }
+
+ int GetTotalRemoved() const
+ {
+ auto guard = Guard(Lock_);
+ return TotalRemoved_;
+ }
protected:
i64 GetWeight(const TSimpleCachedValuePtr& value) const override
@@ -119,24 +141,42 @@ protected:
return value->Weight;
}
- void OnAdded(const TSimpleCachedValuePtr& /*value*/) override
+ void OnAdded(const TSimpleCachedValuePtr& value) override
{
- ++ItemCount_;
+ YT_LOG_DEBUG("Item add (Item: %v)", value->GetKey());
+ auto guard = Guard(Lock_);
+
+ if (!Keys_.find(value->GetKey()).IsEnd()) {
+ YT_LOG_ALERT("Item already exist (Item: %v)", value->GetKey());
+ }
+
+ EmplaceOrCrash(Keys_, value->GetKey());
++TotalAdded_;
}
- void OnRemoved(const TSimpleCachedValuePtr& /*value*/) override
+ void OnRemoved(const TSimpleCachedValuePtr& value) override
{
- --ItemCount_;
+ YT_LOG_DEBUG("Item remove (Item: %v)", value->GetKey());
+ auto guard = Guard(Lock_);
+
+ if (Keys_.find(value->GetKey()).IsEnd()) {
+ YT_LOG_ALERT("Item not found (Item: %v)", value->GetKey());
+ }
+
+ EraseOrCrash(Keys_, value->GetKey());
++TotalRemoved_;
- EXPECT_GE(ItemCount_, 0);
}
+
bool IsResurrectionSupported() const override
{
return EnableResurrection_;
}
private:
+ YT_DECLARE_SPIN_LOCK(NThreading::TSpinLock, Lock_);
+ THashSet<int> Keys_;
+ int TotalAdded_ = 0;
+ int TotalRemoved_ = 0;
bool EnableResurrection_;
};
@@ -464,6 +504,49 @@ TEST(TAsyncSlruCacheTest, AddRemoveWithResurrection)
}
}
+TEST(TAsyncSlruCacheTest, AddRemoveStressTest)
+{
+ auto threadPool = NConcurrency::CreateThreadPool(2, "AddRemoveStressTest");
+
+ constexpr int cacheSize = 5;
+ constexpr int valueCount = 20;
+ auto config = CreateCacheConfig(cacheSize);
+ auto cache = New<TCountingSlruCache>(std::move(config));
+
+ std::vector<TSimpleCachedValuePtr> values;
+
+ for (int i = 0; i < valueCount; ++i) {
+ values.push_back(New<TSimpleCachedValue>(i, i));
+ }
+
+ auto callback = BIND([&] {
+ std::vector<TCountingSlruCache::TInsertCookie> cookies;
+
+ for (int i = 0; i < valueCount; ++i) {
+ auto cookie = cache->BeginInsert(i);
+ cookies.emplace_back(std::move(cookie));
+ }
+
+ for (int i = 0; i < valueCount; ++i) {
+ cookies.back().EndInsert(values[i]);
+ cookies.pop_back();
+ }
+ });
+
+ for (int i = 0; i < 100; i++) {
+ std::vector<TFuture<void>> futures;
+ futures.reserve(2);
+
+ for (int j = 0; j < 2; j++) {
+ futures.push_back(callback.AsyncVia(threadPool->GetInvoker()).Run());
+ }
+
+ NConcurrency::WaitFor(AllSucceeded(futures)).ThrowOnError();
+ }
+
+ threadPool->Shutdown();
+}
+
TEST(TAsyncSlruCacheTest, AddThenImmediatelyRemove)
{
constexpr int cacheSize = 1;