aboutsummaryrefslogtreecommitdiffstats
path: root/ydb/core/persqueue/pq_l2_cache.cpp
diff options
context:
space:
mode:
authorAlek5andr-Kotov <akotov@ydb.tech>2025-02-10 20:05:47 +0300
committerGitHub <noreply@github.com>2025-02-10 20:05:47 +0300
commitd368f28d37310c61daf28813f016dc6f9bfafecd (patch)
tree86e0aee3f96d69106191d4cca95e54da79180bca /ydb/core/persqueue/pq_l2_cache.cpp
parentad3a69baf9c00f3dc48aa33bf820439c0663e7ff (diff)
downloadydb-d368f28d37310c61daf28813f016dc6f9bfafecd.tar.gz
The keys in the block cache (#13553)
Diffstat (limited to 'ydb/core/persqueue/pq_l2_cache.cpp')
-rw-r--r--ydb/core/persqueue/pq_l2_cache.cpp34
1 files changed, 32 insertions, 2 deletions
diff --git a/ydb/core/persqueue/pq_l2_cache.cpp b/ydb/core/persqueue/pq_l2_cache.cpp
index 404f45dbaa..d8ca1cb39e 100644
--- a/ydb/core/persqueue/pq_l2_cache.cpp
+++ b/ydb/core/persqueue/pq_l2_cache.cpp
@@ -34,6 +34,7 @@ void TPersQueueCacheL2::Handle(TEvPqCache::TEvCacheL2Request::TPtr& ev, const TA
TouchBlobs(ctx, tabletId, request->ExpectedBlobs, false);
RemoveBlobs(ctx, tabletId, request->RemovedBlobs);
RegretBlobs(ctx, tabletId, request->MissedBlobs);
+ RenameBlobs(ctx, tabletId, request->RenamedBlobs);
THashMap<TKey, TCacheValue::TPtr> evicted;
AddBlobs(ctx, tabletId, request->StoredBlobs, evicted);
@@ -46,7 +47,7 @@ void TPersQueueCacheL2::SendResponses(const TActorContext& ctx, const THashMap<T
TInstant now = TAppData::TimeProvider->Now();
THashMap<TActorId, THolder<TCacheL2Response>> responses;
- for (auto rm : evictedBlobs) {
+ for (const auto& rm : evictedBlobs) {
const TKey& key = rm.first;
TCacheValue::TPtr evicted = rm.second;
@@ -57,7 +58,7 @@ void TPersQueueCacheL2::SendResponses(const TActorContext& ctx, const THashMap<T
}
Y_ABORT_UNLESS(key.TabletId == resp->TabletId, "PQ L2. Multiple topics in one PQ tablet.");
- resp->Removed.push_back({key.Partition, key.Offset, key.PartNo, evicted});
+ resp->Removed.emplace_back(key.Partition, key.Offset, key.PartNo, evicted);
RetentionTime = now - evicted->GetAccessTime();
if (RetentionTime < KeepTime)
@@ -72,6 +73,13 @@ void TPersQueueCacheL2::SendResponses(const TActorContext& ctx, const THashMap<T
}
}
+void TPersQueueCacheL2::Handle(TEvPqCache::TEvCacheKeysRequest::TPtr& ev, const TActorContext& ctx)
+{
+ auto response = MakeHolder<TEvPqCache::TEvCacheKeysResponse>();
+ response->RenamedKeys = RenamedKeys;
+ ctx.Send(ev->Sender, response.Release());
+}
+
/// @return outRemoved - map of evicted items. L1 should be noticed about them
void TPersQueueCacheL2::AddBlobs(const TActorContext& ctx, ui64 tabletId, const TVector<TCacheBlobL2>& blobs,
THashMap<TKey, TCacheValue::TPtr>& outEvicted)
@@ -156,6 +164,28 @@ void TPersQueueCacheL2::RemoveBlobs(const TActorContext& ctx, ui64 tabletId, con
}
}
+void TPersQueueCacheL2::RenameBlobs(const TActorContext& ctx, ui64 tabletId,
+ const TVector<std::pair<TCacheBlobL2, TCacheBlobL2>>& blobs)
+{
+ RenamedKeys += blobs.size();
+
+ for (const auto& [oldBlob, newBlob] : blobs) {
+ TKey oldKey(tabletId, oldBlob);
+ auto it = Cache.FindWithoutPromote(oldKey);
+ if (it == Cache.End()) {
+ continue;
+ }
+
+ TKey newKey(tabletId, newBlob);
+ Cache.Insert(newKey, *it);
+ Cache.Erase(it);
+
+ LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "PQ Cache (L2). Renamed. Tablet '" << tabletId
+ << "' old partition " << oldBlob.Partition << " old offset " << oldBlob.Offset
+ << " new partition " << newBlob.Partition << " new offset " << newBlob.Offset);
+ }
+}
+
void TPersQueueCacheL2::TouchBlobs(const TActorContext& ctx, ui64 tabletId, const TVector<TCacheBlobL2>& blobs, bool isHit)
{
TInstant now = TAppData::TimeProvider->Now();