diff options
author | Alek5andr-Kotov <akotov@ydb.tech> | 2025-02-10 20:05:47 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2025-02-10 20:05:47 +0300 |
commit | d368f28d37310c61daf28813f016dc6f9bfafecd (patch) | |
tree | 86e0aee3f96d69106191d4cca95e54da79180bca /ydb/core/persqueue/pq_l2_cache.cpp | |
parent | ad3a69baf9c00f3dc48aa33bf820439c0663e7ff (diff) | |
download | ydb-d368f28d37310c61daf28813f016dc6f9bfafecd.tar.gz |
The keys in the block cache (#13553)
Diffstat (limited to 'ydb/core/persqueue/pq_l2_cache.cpp')
-rw-r--r-- | ydb/core/persqueue/pq_l2_cache.cpp | 34 |
1 files changed, 32 insertions, 2 deletions
diff --git a/ydb/core/persqueue/pq_l2_cache.cpp b/ydb/core/persqueue/pq_l2_cache.cpp index 404f45dbaa..d8ca1cb39e 100644 --- a/ydb/core/persqueue/pq_l2_cache.cpp +++ b/ydb/core/persqueue/pq_l2_cache.cpp @@ -34,6 +34,7 @@ void TPersQueueCacheL2::Handle(TEvPqCache::TEvCacheL2Request::TPtr& ev, const TA TouchBlobs(ctx, tabletId, request->ExpectedBlobs, false); RemoveBlobs(ctx, tabletId, request->RemovedBlobs); RegretBlobs(ctx, tabletId, request->MissedBlobs); + RenameBlobs(ctx, tabletId, request->RenamedBlobs); THashMap<TKey, TCacheValue::TPtr> evicted; AddBlobs(ctx, tabletId, request->StoredBlobs, evicted); @@ -46,7 +47,7 @@ void TPersQueueCacheL2::SendResponses(const TActorContext& ctx, const THashMap<T TInstant now = TAppData::TimeProvider->Now(); THashMap<TActorId, THolder<TCacheL2Response>> responses; - for (auto rm : evictedBlobs) { + for (const auto& rm : evictedBlobs) { const TKey& key = rm.first; TCacheValue::TPtr evicted = rm.second; @@ -57,7 +58,7 @@ void TPersQueueCacheL2::SendResponses(const TActorContext& ctx, const THashMap<T } Y_ABORT_UNLESS(key.TabletId == resp->TabletId, "PQ L2. Multiple topics in one PQ tablet."); - resp->Removed.push_back({key.Partition, key.Offset, key.PartNo, evicted}); + resp->Removed.emplace_back(key.Partition, key.Offset, key.PartNo, evicted); RetentionTime = now - evicted->GetAccessTime(); if (RetentionTime < KeepTime) @@ -72,6 +73,13 @@ void TPersQueueCacheL2::SendResponses(const TActorContext& ctx, const THashMap<T } } +void TPersQueueCacheL2::Handle(TEvPqCache::TEvCacheKeysRequest::TPtr& ev, const TActorContext& ctx) +{ + auto response = MakeHolder<TEvPqCache::TEvCacheKeysResponse>(); + response->RenamedKeys = RenamedKeys; + ctx.Send(ev->Sender, response.Release()); +} + /// @return outRemoved - map of evicted items. L1 should be noticed about them void TPersQueueCacheL2::AddBlobs(const TActorContext& ctx, ui64 tabletId, const TVector<TCacheBlobL2>& blobs, THashMap<TKey, TCacheValue::TPtr>& outEvicted) @@ -156,6 +164,28 @@ void TPersQueueCacheL2::RemoveBlobs(const TActorContext& ctx, ui64 tabletId, con } } +void TPersQueueCacheL2::RenameBlobs(const TActorContext& ctx, ui64 tabletId, + const TVector<std::pair<TCacheBlobL2, TCacheBlobL2>>& blobs) +{ + RenamedKeys += blobs.size(); + + for (const auto& [oldBlob, newBlob] : blobs) { + TKey oldKey(tabletId, oldBlob); + auto it = Cache.FindWithoutPromote(oldKey); + if (it == Cache.End()) { + continue; + } + + TKey newKey(tabletId, newBlob); + Cache.Insert(newKey, *it); + Cache.Erase(it); + + LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "PQ Cache (L2). Renamed. Tablet '" << tabletId + << "' old partition " << oldBlob.Partition << " old offset " << oldBlob.Offset + << " new partition " << newBlob.Partition << " new offset " << newBlob.Offset); + } +} + void TPersQueueCacheL2::TouchBlobs(const TActorContext& ctx, ui64 tabletId, const TVector<TCacheBlobL2>& blobs, bool isHit) { TInstant now = TAppData::TimeProvider->Now(); |