diff options
author | Alek5andr-Kotov <akotov@ydb.tech> | 2025-02-10 20:05:47 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2025-02-10 20:05:47 +0300 |
commit | d368f28d37310c61daf28813f016dc6f9bfafecd (patch) | |
tree | 86e0aee3f96d69106191d4cca95e54da79180bca | |
parent | ad3a69baf9c00f3dc48aa33bf820439c0663e7ff (diff) | |
download | ydb-d368f28d37310c61daf28813f016dc6f9bfafecd.tar.gz |
The keys in the block cache (#13553)
-rw-r--r-- | ydb/core/persqueue/cache_eviction.h | 127 | ||||
-rw-r--r-- | ydb/core/persqueue/map_subrange.h | 24 | ||||
-rw-r--r-- | ydb/core/persqueue/partition.cpp | 12 | ||||
-rw-r--r-- | ydb/core/persqueue/partition.h | 2 | ||||
-rw-r--r-- | ydb/core/persqueue/partition_id.h | 15 | ||||
-rw-r--r-- | ydb/core/persqueue/partition_write.cpp | 4 | ||||
-rw-r--r-- | ydb/core/persqueue/pq_l2_cache.cpp | 34 | ||||
-rw-r--r-- | ydb/core/persqueue/pq_l2_cache.h | 7 | ||||
-rw-r--r-- | ydb/core/persqueue/pq_l2_service.h | 11 | ||||
-rw-r--r-- | ydb/core/persqueue/read.h | 70 | ||||
-rw-r--r-- | ydb/core/persqueue/ut/cache_eviction_ut.cpp | 177 | ||||
-rw-r--r-- | ydb/core/persqueue/ut/partition_ut.cpp | 4 | ||||
-rw-r--r-- | ydb/core/persqueue/ut/ya.make | 1 | ||||
-rw-r--r-- | ydb/core/testlib/test_pq_client.h | 39 | ||||
-rw-r--r-- | ydb/public/sdk/cpp/src/client/topic/ut/topic_to_table_ut.cpp | 91 | ||||
-rw-r--r-- | ydb/services/persqueue_v1/persqueue_ut.cpp | 8 |
16 files changed, 588 insertions, 38 deletions
diff --git a/ydb/core/persqueue/cache_eviction.h b/ydb/core/persqueue/cache_eviction.h index 8826101d40..6826fb8164 100644 --- a/ydb/core/persqueue/cache_eviction.h +++ b/ydb/core/persqueue/cache_eviction.h @@ -5,6 +5,7 @@ #include <ydb/core/base/appdata.h> #include <ydb/core/persqueue/events/internal.h> +#include <ydb/core/persqueue/map_subrange.h> namespace NKikimr { namespace NPQ { @@ -25,10 +26,18 @@ namespace NPQ { { } - bool operator == (const TBlobId& r) const { + bool operator==(const TBlobId& r) const { return Partition.IsEqual(r.Partition) && Offset == r.Offset && PartNo == r.PartNo; } + bool operator<(const TBlobId& r) const { + auto makeTuple = [](const TBlobId& v) { + return std::make_tuple(v.Partition, v.Offset, v.PartNo, v.Count, v.InternalPartsCount); + }; + + return makeTuple(*this) < makeTuple(r); + } + ui64 Hash() const { return Hash128to32((ui64(Partition.InternalPartitionId) << 17) + (Partition.IsSupportivePartition() ? 0 : (1 << 16)) + PartNo, Offset); } @@ -51,12 +60,26 @@ namespace NPQ { TypeWrite }; + struct TDeleteBlobRange { + TString Begin; + bool IncludeBegin; + TString End; + bool IncludeEnd; + }; + + struct TRenameBlob { + TString From; + TString To; + }; + ERequestType Type; TActorId Sender; ui64 CookiePQ; TPartitionId Partition; ui32 MetadataWritesCount; TVector<TRequestedBlob> Blobs; + TVector<TDeleteBlobRange> DeletedBlobs; + TVector<TRenameBlob> RenamedBlobs; TKvRequest(ERequestType type, TActorId sender, ui64 cookie, const TPartitionId& partition) : Type(type) @@ -175,13 +198,13 @@ namespace NPQ { , Source(SourcePrefetch) {} - const TCacheValue::TPtr GetBlob() const { return Blob.lock(); } + const TCacheValue::TPtr GetBlob() const { return Blob; } private: - TCacheValue::TWeakPtr Blob; + TCacheValue::TPtr Blob; }; - using TMapType = THashMap<TBlobId, TValueL1>; + using TMapType = TMap<TBlobId, TValueL1>; struct TCounters { ui64 SizeBytes = 0; @@ -251,30 +274,90 @@ namespace NPQ { { auto reqData = MakeHolder<TCacheL2Request>(TabletId); + DeleteBlobs(kvReq, *reqData, ctx); + RenameBlobs(kvReq, *reqData, ctx); + SaveBlobs(kvReq, *reqData, ctx); + + auto l2Request = MakeHolder<TEvPqCache::TEvCacheL2Request>(reqData.Release()); + ctx.Send(MakePersQueueL2CacheID(), l2Request.Release()); // -> L2 + } + + void SaveBlobs(const TKvRequest& kvReq, TCacheL2Request& reqData, const TActorContext& ctx) + { for (const TRequestedBlob& reqBlob : kvReq.Blobs) { TBlobId blob(kvReq.Partition, reqBlob.Offset, reqBlob.PartNo, reqBlob.Count, reqBlob.InternalPartsCount); - { // there could be a new blob with same id (for big messages) - if (RemoveExists(ctx, blob)) { - reqData->RemovedBlobs.emplace_back(kvReq.Partition, reqBlob.Offset, reqBlob.PartNo, nullptr); - } + + // there could be a new blob with same id (for big messages) + if (RemoveExists(ctx, blob)) { + reqData.RemovedBlobs.emplace_back(kvReq.Partition, reqBlob.Offset, reqBlob.PartNo, nullptr); } - TCacheValue::TPtr cached(new TCacheValue(reqBlob.Value, ctx.SelfID, TAppData::TimeProvider->Now())); + auto cached = std::make_shared<TCacheValue>(reqBlob.Value, ctx.SelfID, TAppData::TimeProvider->Now()); TValueL1 valL1(cached, cached->DataSize(), TValueL1::SourceHead); Cache[blob] = valL1; // weak Counters.Inc(valL1); if (L1Strategy) L1Strategy->SaveHeadBlob(blob); - reqData->StoredBlobs.emplace_back(kvReq.Partition, reqBlob.Offset, reqBlob.PartNo, cached); + reqData.StoredBlobs.emplace_back(kvReq.Partition, reqBlob.Offset, reqBlob.PartNo, cached); LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "Caching head blob in L1. Partition " << blob.Partition << " offset " << blob.Offset << " count " << blob.Count << " size " << reqBlob.Value.size() << " actorID " << ctx.SelfID); } + } - auto l2Request = MakeHolder<TEvPqCache::TEvCacheL2Request>(reqData.Release()); - ctx.Send(MakePersQueueL2CacheID(), l2Request.Release()); // -> L2 + TBlobId MakeBlobId(const TString& s) + { + if (s.length() == TKeyPrefix::MarkPosition()) { + TPartitionId partitionId; + partitionId.OriginalPartitionId = FromString<ui32>(s.data() + 1, 10); + partitionId.InternalPartitionId = partitionId.OriginalPartitionId; + return {partitionId, 0, 0, 0, 0}; + } else { + TKey key(s); + return {key.GetPartition(), key.GetOffset(), key.GetPartNo(), key.GetCount(), key.GetInternalPartsCount()}; + } + } + + void RenameBlobs(const TKvRequest& kvReq, TCacheL2Request& reqData, const TActorContext& ctx) + { + for (const auto& [oldKey, newKey] : kvReq.RenamedBlobs) { + TBlobId oldBlob = MakeBlobId(oldKey); + TBlobId newBlob = MakeBlobId(newKey); + if (RenameExists(ctx, oldBlob, newBlob)) { + reqData.RenamedBlobs.emplace_back(std::piecewise_construct, + std::make_tuple(oldBlob.Partition, oldBlob.Offset, oldBlob.PartNo, nullptr), + std::make_tuple(newBlob.Partition, newBlob.Offset, newBlob.PartNo, nullptr)); + + LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "Renaming head blob in L1. Old partition " + << oldBlob.Partition << " old offset " << oldBlob.Offset << " old count " << oldBlob.Count + << " new partition " << newBlob.Partition << " new offset " << newBlob.Offset << " new count " << newBlob.Count + << " actorID " << ctx.SelfID); + } + } + } + + void DeleteBlobs(const TKvRequest& kvReq, TCacheL2Request& reqData, const TActorContext& ctx) + { + for (const auto& range : kvReq.DeletedBlobs) { + auto [lowerBound, upperBound] = MapSubrange(Cache, + MakeBlobId(range.Begin), range.IncludeBegin, + MakeBlobId(range.End), range.IncludeEnd); + + for (auto i = lowerBound; i != upperBound; ++i) { + const auto& [blob, value] = *i; + + reqData.RemovedBlobs.emplace_back(blob.Partition, blob.Offset, blob.PartNo, nullptr); + Counters.Dec(value); + + LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "Deleting head blob in L1. Partition " + << blob.Partition << " offset " << blob.Offset << " count " << blob.Count + << " actorID " << ctx.SelfID); + } + + Cache.erase(lowerBound, upperBound); + } } void SavePrefetchBlobs(const TActorContext& ctx, const TKvRequest& kvReq, const TVector<bool>& store) @@ -461,6 +544,26 @@ namespace NPQ { TValueL1 value; return CheckExists(ctx, blob, value, true); } + + bool RenameExists(const TActorContext& ctx, const TBlobId& oldBlob, const TBlobId& newBlob) + { + Y_UNUSED(ctx); + + auto it = Cache.find(oldBlob); + if (it == Cache.end()) { + return false; + } + + TValueL1 value = it->second; + Cache.erase(it); + + Cache[newBlob] = value; + Counters.Inc(value); + if (L1Strategy) + L1Strategy->SaveHeadBlob(newBlob); + + return true; + } }; } //NPQ diff --git a/ydb/core/persqueue/map_subrange.h b/ydb/core/persqueue/map_subrange.h new file mode 100644 index 0000000000..1ab83f0076 --- /dev/null +++ b/ydb/core/persqueue/map_subrange.h @@ -0,0 +1,24 @@ +namespace NKikimr::NPQ { + +template<class M, class I = typename M::const_iterator> +std::pair<I, I> MapSubrange(const M& map, + const typename M::key_type& begin, bool includeBegin, + const typename M::key_type& end, bool includeEnd) +{ + if (map.empty()) { + return {map.end(), map.end()}; + } + + if (begin == end) { + if ((includeBegin != includeEnd) || !includeBegin) { + return {map.end(), map.end()}; + } + } + + auto leftBorder = includeBegin ? map.lower_bound(begin) : map.upper_bound(begin); + auto rightBorder = includeEnd ? map.upper_bound(end) : map.lower_bound(end); + + return {leftBorder, rightBorder}; +} + +} diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp index b07e9e40a9..657402d367 100644 --- a/ydb/core/persqueue/partition.cpp +++ b/ydb/core/persqueue/partition.cpp @@ -1827,7 +1827,7 @@ void TPartition::ProcessTxsAndUserActs(const TActorContext& ctx) AddCmdDeleteRangeForAllKeys(*PersistRequest); - ctx.Send(Tablet, PersistRequest.Release(), 0, 0, PersistRequestSpan.GetTraceId()); + ctx.Send(BlobCache, PersistRequest.Release(), 0, 0, PersistRequestSpan.GetTraceId()); PersistRequest = nullptr; CurrentPersistRequestSpan = std::move(PersistRequestSpan); PersistRequestSpan = NWilson::TSpan(); @@ -1993,7 +1993,9 @@ void TPartition::RunPersist() { //haveChanges = true; } - TryAddDeleteHeadKeysToPersistRequest(); + if (TryAddDeleteHeadKeysToPersistRequest()) { + haveChanges = true; + } if (haveChanges || TxIdHasChanged || !AffectedUsers.empty() || ChangeConfig) { WriteCycleStartTime = now; @@ -2055,8 +2057,10 @@ void TPartition::RunPersist() { PersistRequest = nullptr; } -void TPartition::TryAddDeleteHeadKeysToPersistRequest() +bool TPartition::TryAddDeleteHeadKeysToPersistRequest() { + bool haveChanges = !DeletedKeys.empty(); + while (!DeletedKeys.empty()) { auto& k = DeletedKeys.back(); @@ -2070,6 +2074,8 @@ void TPartition::TryAddDeleteHeadKeysToPersistRequest() DeletedKeys.pop_back(); } + + return haveChanges; } void TPartition::DumpKeyValueRequest(const NKikimrClient::TKeyValueRequest& request) diff --git a/ydb/core/persqueue/partition.h b/ydb/core/persqueue/partition.h index 9ecc0bac0e..197ef34f61 100644 --- a/ydb/core/persqueue/partition.h +++ b/ydb/core/persqueue/partition.h @@ -982,7 +982,7 @@ private: size_t WriteNewSizeFromSupportivePartitions = 0; - void TryAddDeleteHeadKeysToPersistRequest(); + bool TryAddDeleteHeadKeysToPersistRequest(); void DumpKeyValueRequest(const NKikimrClient::TKeyValueRequest& request); TBlobKeyTokenPtr MakeBlobKeyToken(const TString& key); diff --git a/ydb/core/persqueue/partition_id.h b/ydb/core/persqueue/partition_id.h index 0c1dbb8d3a..1582a71592 100644 --- a/ydb/core/persqueue/partition_id.h +++ b/ydb/core/persqueue/partition_id.h @@ -43,6 +43,15 @@ public: (InternalPartitionId == rhs.InternalPartitionId); } + bool IsLess(const TPartitionId& rhs) const + { + auto makeTuple = [](const TPartitionId& v) { + return std::make_tuple(v.OriginalPartitionId, v.WriteId, v.InternalPartitionId); + }; + + return makeTuple(*this) < makeTuple(rhs); + } + void ToStream(IOutputStream& s) const { if (WriteId.Defined()) { @@ -76,6 +85,12 @@ bool operator==(const TPartitionId& lhs, const TPartitionId& rhs) } inline +bool operator<(const TPartitionId& lhs, const TPartitionId& rhs) +{ + return lhs.IsLess(rhs); +} + +inline IOutputStream& operator<<(IOutputStream& s, const TPartitionId& v) { v.ToStream(s); diff --git a/ydb/core/persqueue/partition_write.cpp b/ydb/core/persqueue/partition_write.cpp index fce4a23606..c590b3b6e4 100644 --- a/ydb/core/persqueue/partition_write.cpp +++ b/ydb/core/persqueue/partition_write.cpp @@ -1069,7 +1069,6 @@ void TPartition::AddCmdWrite(const std::optional<TPartitionedBlob::TFormedBlobIn TKey resKey = newWrite->Key; resKey.SetType(TKeyPrefix::TypeData); - write->SetKeyToCache(resKey.ToString()); WriteCycleSize += newWrite->Value.size(); } @@ -1486,9 +1485,6 @@ void TPartition::AddNewWriteBlob(std::pair<TKey, ui32>& res, TEvKeyValue::TEvReq write->SetKey(key.Data(), key.Size()); write->SetValue(valueD); - if (!key.IsHead()) - write->SetKeyToCache(key.Data(), key.Size()); - bool isInline = key.IsHead() && valueD.size() < MAX_INLINE_SIZE; if (isInline) diff --git a/ydb/core/persqueue/pq_l2_cache.cpp b/ydb/core/persqueue/pq_l2_cache.cpp index 404f45dbaa..d8ca1cb39e 100644 --- a/ydb/core/persqueue/pq_l2_cache.cpp +++ b/ydb/core/persqueue/pq_l2_cache.cpp @@ -34,6 +34,7 @@ void TPersQueueCacheL2::Handle(TEvPqCache::TEvCacheL2Request::TPtr& ev, const TA TouchBlobs(ctx, tabletId, request->ExpectedBlobs, false); RemoveBlobs(ctx, tabletId, request->RemovedBlobs); RegretBlobs(ctx, tabletId, request->MissedBlobs); + RenameBlobs(ctx, tabletId, request->RenamedBlobs); THashMap<TKey, TCacheValue::TPtr> evicted; AddBlobs(ctx, tabletId, request->StoredBlobs, evicted); @@ -46,7 +47,7 @@ void TPersQueueCacheL2::SendResponses(const TActorContext& ctx, const THashMap<T TInstant now = TAppData::TimeProvider->Now(); THashMap<TActorId, THolder<TCacheL2Response>> responses; - for (auto rm : evictedBlobs) { + for (const auto& rm : evictedBlobs) { const TKey& key = rm.first; TCacheValue::TPtr evicted = rm.second; @@ -57,7 +58,7 @@ void TPersQueueCacheL2::SendResponses(const TActorContext& ctx, const THashMap<T } Y_ABORT_UNLESS(key.TabletId == resp->TabletId, "PQ L2. Multiple topics in one PQ tablet."); - resp->Removed.push_back({key.Partition, key.Offset, key.PartNo, evicted}); + resp->Removed.emplace_back(key.Partition, key.Offset, key.PartNo, evicted); RetentionTime = now - evicted->GetAccessTime(); if (RetentionTime < KeepTime) @@ -72,6 +73,13 @@ void TPersQueueCacheL2::SendResponses(const TActorContext& ctx, const THashMap<T } } +void TPersQueueCacheL2::Handle(TEvPqCache::TEvCacheKeysRequest::TPtr& ev, const TActorContext& ctx) +{ + auto response = MakeHolder<TEvPqCache::TEvCacheKeysResponse>(); + response->RenamedKeys = RenamedKeys; + ctx.Send(ev->Sender, response.Release()); +} + /// @return outRemoved - map of evicted items. L1 should be noticed about them void TPersQueueCacheL2::AddBlobs(const TActorContext& ctx, ui64 tabletId, const TVector<TCacheBlobL2>& blobs, THashMap<TKey, TCacheValue::TPtr>& outEvicted) @@ -156,6 +164,28 @@ void TPersQueueCacheL2::RemoveBlobs(const TActorContext& ctx, ui64 tabletId, con } } +void TPersQueueCacheL2::RenameBlobs(const TActorContext& ctx, ui64 tabletId, + const TVector<std::pair<TCacheBlobL2, TCacheBlobL2>>& blobs) +{ + RenamedKeys += blobs.size(); + + for (const auto& [oldBlob, newBlob] : blobs) { + TKey oldKey(tabletId, oldBlob); + auto it = Cache.FindWithoutPromote(oldKey); + if (it == Cache.End()) { + continue; + } + + TKey newKey(tabletId, newBlob); + Cache.Insert(newKey, *it); + Cache.Erase(it); + + LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "PQ Cache (L2). Renamed. Tablet '" << tabletId + << "' old partition " << oldBlob.Partition << " old offset " << oldBlob.Offset + << " new partition " << newBlob.Partition << " new offset " << newBlob.Offset); + } +} + void TPersQueueCacheL2::TouchBlobs(const TActorContext& ctx, ui64 tabletId, const TVector<TCacheBlobL2>& blobs, bool isHit) { TInstant now = TAppData::TimeProvider->Now(); diff --git a/ydb/core/persqueue/pq_l2_cache.h b/ydb/core/persqueue/pq_l2_cache.h index 42aa184814..3747da4d1e 100644 --- a/ydb/core/persqueue/pq_l2_cache.h +++ b/ydb/core/persqueue/pq_l2_cache.h @@ -93,6 +93,7 @@ private: switch (ev->GetTypeRewrite()) { HFuncTraced(TEvents::TEvPoisonPill, Handle); HFuncTraced(TEvPqCache::TEvCacheL2Request, Handle); + HFuncTraced(TEvPqCache::TEvCacheKeysRequest, Handle); HFuncTraced(NMon::TEvHttpInfo, Handle); default: break; @@ -110,11 +111,15 @@ private: void Handle(TEvPqCache::TEvCacheL2Request::TPtr& ev, const TActorContext& ctx); void SendResponses(const TActorContext& ctx, const THashMap<TKey, TCacheValue::TPtr>& evicted); + void Handle(TEvPqCache::TEvCacheKeysRequest::TPtr& ev, const TActorContext& ctx); + void AddBlobs(const TActorContext& ctx, ui64 tabletId, const TVector<TCacheBlobL2>& blobs, THashMap<TKey, TCacheValue::TPtr>& outEvicted); void RemoveBlobs(const TActorContext& ctx, ui64 tabletId, const TVector<TCacheBlobL2>& blobs); void TouchBlobs(const TActorContext& ctx, ui64 tabletId, const TVector<TCacheBlobL2>& blobs, bool isHit = true); void RegretBlobs(const TActorContext& ctx, ui64 tabletId, const TVector<TCacheBlobL2>& blobs); + void RenameBlobs(const TActorContext& ctx, ui64 tabletId, + const TVector<std::pair<TCacheBlobL2, TCacheBlobL2>>& blobs); static ui64 ClampMinSize(ui64 maxSize) { static const ui64 MIN_SIZE = 32_MB; @@ -130,6 +135,8 @@ private: TL2Counters Counters; TString HttpForm() const; + + size_t RenamedKeys = 0; }; } // NPQ diff --git a/ydb/core/persqueue/pq_l2_service.h b/ydb/core/persqueue/pq_l2_service.h index d48f303eec..a02b2d9bfb 100644 --- a/ydb/core/persqueue/pq_l2_service.h +++ b/ydb/core/persqueue/pq_l2_service.h @@ -83,6 +83,7 @@ struct TCacheL2Request { TVector<TCacheBlobL2> RemovedBlobs; TVector<TCacheBlobL2> ExpectedBlobs; TVector<TCacheBlobL2> MissedBlobs; + TVector<std::pair<TCacheBlobL2, TCacheBlobL2>> RenamedBlobs; explicit TCacheL2Request(ui64 tabletId) : TabletId(tabletId) @@ -102,7 +103,8 @@ struct TEvPqCache { enum EEv { EvCacheRequest = EventSpaceBegin(TKikimrEvents::ES_PQ_L2_CACHE), EvCacheResponse, - + EvCacheKeysRequest, + EvCacheKeysResponse, EvEnd }; @@ -123,6 +125,13 @@ struct TEvPqCache { : Data(data) {} }; + + struct TEvCacheKeysRequest : TEventLocal<TEvCacheKeysRequest, EvCacheKeysRequest> { + }; + + struct TEvCacheKeysResponse : TEventLocal<TEvCacheKeysResponse, EvCacheKeysResponse> { + size_t RenamedKeys = 0; + }; }; } // NPQ diff --git a/ydb/core/persqueue/read.h b/ydb/core/persqueue/read.h index 54986321af..365220586a 100644 --- a/ydb/core/persqueue/read.h +++ b/ydb/core/persqueue/read.h @@ -264,15 +264,32 @@ namespace NPQ { auto& srcRequest = ev->Get()->Record; TKvRequest kvReq(TKvRequest::TypeWrite, ev->Sender, Max<ui64>(), TPartitionId(Max<ui32>())); + + SaveCmdWrite(srcRequest, kvReq, ctx); + SaveCmdRename(srcRequest, kvReq, ctx); + SaveCmdDelete(srcRequest, kvReq, ctx); + + ui64 cookie = SaveKvRequest(std::move(kvReq)); + + auto request = MakeHolder<TEvKeyValue::TEvRequest>(); + request->Record = std::move(srcRequest); + request->Record.SetCookie(cookie); + + ctx.Send(Tablet, request.Release(), 0, 0, std::move(ev->TraceId)); // -> KV + } + + void SaveCmdWrite(const NKikimrClient::TKeyValueRequest& srcRequest, TKvRequest& kvReq, const TActorContext& ctx) + { kvReq.Blobs.reserve(srcRequest.CmdWriteSize()); for (ui32 i = 0; i < srcRequest.CmdWriteSize(); ++i) { const auto& cmd = srcRequest.GetCmdWrite(i); - if (cmd.HasKeyToCache()) { - const TString& strKey = cmd.GetKeyToCache(); - Y_ABORT_UNLESS(strKey.size() == TKey::KeySize(), "Unexpected key size: %" PRIu64, strKey.size()); + const TString& strKey = cmd.GetKey(); + if (IsDataKey(strKey)) { + Y_ABORT_UNLESS((strKey.size() >= TKey::KeySize()) && (strKey.size() - TKey::KeySize() <= 1), + "Unexpected key size: %" PRIu64 " (%s)", + strKey.size(), strKey.data()); TKey key(strKey); - Y_ABORT_UNLESS(!key.IsHead()); const TString& value = cmd.GetValue(); kvReq.Partition = key.GetPartition(); @@ -285,14 +302,49 @@ namespace NPQ { kvReq.MetadataWritesCount++; } } + } - ui64 cookie = SaveKvRequest(std::move(kvReq)); + void SaveCmdRename(const NKikimrClient::TKeyValueRequest& srcRequest, TKvRequest& kvReq, const TActorContext& ctx) + { + kvReq.RenamedBlobs.reserve(srcRequest.CmdRenameSize()); - auto request = MakeHolder<TEvKeyValue::TEvRequest>(); - request->Record = std::move(srcRequest); - request->Record.SetCookie(cookie); + for (ui32 i = 0; i < srcRequest.CmdRenameSize(); ++i) { + const auto& cmd = srcRequest.GetCmdRename(i); + if (!IsDataKey(cmd.GetOldKey()) || !IsDataKey(cmd.GetNewKey())) { + continue; + } + kvReq.RenamedBlobs.emplace_back(cmd.GetOldKey(), cmd.GetNewKey()); - ctx.Send(Tablet, request.Release(), 0, 0, std::move(ev->TraceId)); // -> KV + LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "CacheProxy. Rename blob from " << cmd.GetOldKey() << " to " << cmd.GetNewKey()); + } + } + + void SaveCmdDelete(const NKikimrClient::TKeyValueRequest& srcRequest, TKvRequest& kvReq, const TActorContext& ctx) + { + kvReq.DeletedBlobs.reserve(srcRequest.CmdDeleteRangeSize()); + + for (ui32 i = 0; i < srcRequest.CmdDeleteRangeSize(); ++i) { + const auto& cmd = srcRequest.GetCmdDeleteRange(i); + const auto& range = cmd.GetRange(); + if (!IsDataKey(range.GetFrom()) || !IsDataKey(range.GetTo())) { + continue; + } + kvReq.DeletedBlobs.emplace_back(range.GetFrom(), range.GetIncludeFrom(), + range.GetTo(), range.GetIncludeTo()); + + LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, + "CacheProxy. Delete blobs from " << + range.GetFrom() << "(" << (range.GetIncludeFrom() ? '+' : '-') << ") to " << + range.GetTo() << "(" << (range.GetIncludeTo() ? '+' : '-') << ")"); + } + } + + bool IsDataKey(const TString& key) const { + if (key.empty()) { + return false; + } + const char type = std::tolower(key.front()); + return type == TKeyPrefix::TypeData; // TypeData || ServiceTypeData } void Handle(TEvPqCache::TEvCacheL2Response::TPtr& ev, const TActorContext& ctx) diff --git a/ydb/core/persqueue/ut/cache_eviction_ut.cpp b/ydb/core/persqueue/ut/cache_eviction_ut.cpp new file mode 100644 index 0000000000..77b6273723 --- /dev/null +++ b/ydb/core/persqueue/ut/cache_eviction_ut.cpp @@ -0,0 +1,177 @@ +#include <library/cpp/testing/unittest/registar.h> +#include <ydb/core/persqueue/map_subrange.h> + +Y_UNIT_TEST_SUITE(CacheEviction) { + +template<class K> +struct TKeyRange { + K Begin; + bool IncludeBegin; + K End; + bool IncludeEnd; +}; + +template<class K> +TKeyRange<K> OpenOpen(K begin, K end) +{ + return {.Begin=begin, .IncludeBegin=false, .End=end, .IncludeEnd=false}; +} + +template<class K> +TKeyRange<K> OpenClose(K begin, K end) +{ + return {.Begin=begin, .IncludeBegin=false, .End=end, .IncludeEnd=true}; +} + +template<class K> +TKeyRange<K> CloseOpen(K begin, K end) +{ + return {.Begin=begin, .IncludeBegin=true, .End=end, .IncludeEnd=false}; +} + +template<class K> +TKeyRange<K> CloseClose(K begin, K end) +{ + return {.Begin=begin, .IncludeBegin=true, .End=end, .IncludeEnd=true}; +} + +void CheckDeleteKeys(const TMap<int, int>& map, const TKeyRange<int>& range, const TSet<int>& expectedKeys) +{ + auto [lowerBound, upperBound] = + NKikimr::NPQ::MapSubrange(map, + range.Begin, range.IncludeBegin, + range.End, range.IncludeEnd); + + TSet<int> remainKeys; + for (const auto& [key, _] : map) { + remainKeys.insert(key); + } + + for (; lowerBound != upperBound; ++lowerBound) { + remainKeys.erase(lowerBound->first); + } + + UNIT_ASSERT_VALUES_EQUAL(remainKeys, expectedKeys); +} + +Y_UNIT_TEST(DeleteKeys) { + const TMap<int, int> map{ + {11, 1}, + {13, 1}, + {15, 1}, + {17, 1}, + {19, 1} + }; + + CheckDeleteKeys(map, OpenOpen ( 2, 10), {11, 13, 15, 17, 19}); + CheckDeleteKeys(map, OpenClose ( 2, 10), {11, 13, 15, 17, 19}); + CheckDeleteKeys(map, CloseOpen ( 2, 10), {11, 13, 15, 17, 19}); + CheckDeleteKeys(map, CloseClose( 2, 10), {11, 13, 15, 17, 19}); + + CheckDeleteKeys(map, OpenOpen ( 2, 11), {11, 13, 15, 17, 19}); + CheckDeleteKeys(map, OpenClose ( 2, 11), { 13, 15, 17, 19}); + CheckDeleteKeys(map, CloseOpen ( 2, 11), {11, 13, 15, 17, 19}); + CheckDeleteKeys(map, CloseClose( 2, 11), { 13, 15, 17, 19}); + + CheckDeleteKeys(map, OpenOpen ( 2, 14), { 15, 17, 19}); + CheckDeleteKeys(map, OpenClose ( 2, 14), { 15, 17, 19}); + CheckDeleteKeys(map, CloseOpen ( 2, 14), { 15, 17, 19}); + CheckDeleteKeys(map, CloseClose( 2, 14), { 15, 17, 19}); + + CheckDeleteKeys(map, OpenOpen ( 2, 15), { 15, 17, 19}); + CheckDeleteKeys(map, OpenClose ( 2, 15), { 17, 19}); + CheckDeleteKeys(map, CloseOpen ( 2, 15), { 15, 17, 19}); + CheckDeleteKeys(map, CloseClose( 2, 15), { 17, 19}); + + CheckDeleteKeys(map, OpenOpen ( 2, 19), { 19}); + CheckDeleteKeys(map, OpenClose ( 2, 19), { }); + CheckDeleteKeys(map, CloseOpen ( 2, 19), { 19}); + CheckDeleteKeys(map, CloseClose( 2, 19), { }); + + CheckDeleteKeys(map, OpenOpen ( 2, 21), { }); + CheckDeleteKeys(map, OpenClose ( 2, 21), { }); + CheckDeleteKeys(map, CloseOpen ( 2, 21), { }); + CheckDeleteKeys(map, CloseClose( 2, 21), { }); + + CheckDeleteKeys(map, OpenOpen (11, 11), {11, 13, 15, 17, 19}); + CheckDeleteKeys(map, OpenClose (11, 11), {11, 13, 15, 17, 19}); + CheckDeleteKeys(map, CloseOpen (11, 11), {11, 13, 15, 17, 19}); + CheckDeleteKeys(map, CloseClose(11, 11), { 13, 15, 17, 19}); + + CheckDeleteKeys(map, OpenOpen (11, 14), {11, 15, 17, 19}); + CheckDeleteKeys(map, OpenClose (11, 14), {11, 15, 17, 19}); + CheckDeleteKeys(map, CloseOpen (11, 14), { 15, 17, 19}); + CheckDeleteKeys(map, CloseClose(11, 14), { 15, 17, 19}); + + CheckDeleteKeys(map, OpenOpen (11, 15), {11, 15, 17, 19}); + CheckDeleteKeys(map, OpenClose (11, 15), {11, 17, 19}); + CheckDeleteKeys(map, CloseOpen (11, 15), { 15, 17, 19}); + CheckDeleteKeys(map, CloseClose(11, 15), { 17, 19}); + + CheckDeleteKeys(map, OpenOpen (11, 19), {11, 19}); + CheckDeleteKeys(map, OpenClose (11, 19), {11 }); + CheckDeleteKeys(map, CloseOpen (11, 19), { 19}); + CheckDeleteKeys(map, CloseClose(11, 19), { }); + + CheckDeleteKeys(map, OpenOpen (11, 21), {11 }); + CheckDeleteKeys(map, OpenClose (11, 21), {11 }); + CheckDeleteKeys(map, CloseOpen (11, 21), { }); + CheckDeleteKeys(map, CloseClose(11, 21), { }); + + CheckDeleteKeys(map, OpenOpen (12, 14), {11, 15, 17, 19}); + CheckDeleteKeys(map, OpenClose (12, 14), {11, 15, 17, 19}); + CheckDeleteKeys(map, CloseOpen (12, 14), {11, 15, 17, 19}); + CheckDeleteKeys(map, CloseClose(12, 14), {11, 15, 17, 19}); + + CheckDeleteKeys(map, OpenOpen (14, 14), {11, 13, 15, 17, 19}); + CheckDeleteKeys(map, OpenClose (14, 14), {11, 13, 15, 17, 19}); + CheckDeleteKeys(map, CloseOpen (14, 14), {11, 13, 15, 17, 19}); + CheckDeleteKeys(map, CloseClose(14, 14), {11, 13, 15, 17, 19}); + + CheckDeleteKeys(map, OpenOpen (14, 15), {11, 13, 15, 17, 19}); + CheckDeleteKeys(map, OpenClose (14, 15), {11, 13, 17, 19}); + CheckDeleteKeys(map, CloseOpen (14, 15), {11, 13, 15, 17, 19}); + CheckDeleteKeys(map, CloseClose(14, 15), {11, 13, 17, 19}); + + CheckDeleteKeys(map, OpenOpen (14, 19), {11, 13, 19}); + CheckDeleteKeys(map, OpenClose (14, 19), {11, 13 }); + CheckDeleteKeys(map, CloseOpen (14, 19), {11, 13, 19}); + CheckDeleteKeys(map, CloseClose(14, 19), {11, 13 }); + + CheckDeleteKeys(map, OpenOpen (14, 21), {11, 13 }); + CheckDeleteKeys(map, OpenClose (14, 21), {11, 13 }); + CheckDeleteKeys(map, CloseOpen (14, 21), {11, 13 }); + CheckDeleteKeys(map, CloseClose(14, 21), {11, 13 }); + + CheckDeleteKeys(map, OpenOpen (15, 15), {11, 13, 15, 17, 19}); + CheckDeleteKeys(map, OpenClose (15, 15), {11, 13, 15, 17, 19}); + CheckDeleteKeys(map, CloseOpen (15, 15), {11, 13, 15, 17, 19}); + CheckDeleteKeys(map, CloseClose(15, 15), {11, 13, 17, 19}); + + CheckDeleteKeys(map, OpenOpen (15, 19), {11, 13, 15, 19}); + CheckDeleteKeys(map, OpenClose (15, 19), {11, 13, 15, }); + CheckDeleteKeys(map, CloseOpen (15, 19), {11, 13, 19}); + CheckDeleteKeys(map, CloseClose(15, 19), {11, 13 }); + + CheckDeleteKeys(map, OpenOpen (15, 21), {11, 13, 15 }); + CheckDeleteKeys(map, OpenClose (15, 21), {11, 13, 15 }); + CheckDeleteKeys(map, CloseOpen (15, 21), {11, 13 }); + CheckDeleteKeys(map, CloseClose(15, 21), {11, 13 }); + + CheckDeleteKeys(map, OpenOpen (19, 19), {11, 13, 15, 17, 19}); + CheckDeleteKeys(map, OpenClose (19, 19), {11, 13, 15, 17, 19}); + CheckDeleteKeys(map, CloseOpen (19, 19), {11, 13, 15, 17, 19}); + CheckDeleteKeys(map, CloseClose(19, 19), {11, 13, 15, 17 }); + + CheckDeleteKeys(map, OpenOpen (19, 21), {11, 13, 15, 17, 19}); + CheckDeleteKeys(map, OpenClose (19, 21), {11, 13, 15, 17, 19}); + CheckDeleteKeys(map, CloseOpen (19, 21), {11, 13, 15, 17 }); + CheckDeleteKeys(map, CloseClose(19, 21), {11, 13, 15, 17 }); + + CheckDeleteKeys(map, OpenOpen (21, 21), {11, 13, 15, 17, 19}); + CheckDeleteKeys(map, OpenClose (21, 21), {11, 13, 15, 17, 19}); + CheckDeleteKeys(map, CloseOpen (21, 21), {11, 13, 15, 17, 19}); + CheckDeleteKeys(map, CloseClose(21, 21), {11, 13, 15, 17, 19}); +} + +} diff --git a/ydb/core/persqueue/ut/partition_ut.cpp b/ydb/core/persqueue/ut/partition_ut.cpp index d01b686442..51aefbc2fb 100644 --- a/ydb/core/persqueue/ut/partition_ut.cpp +++ b/ydb/core/persqueue/ut/partition_ut.cpp @@ -1149,10 +1149,10 @@ void TPartitionFixture::ShadowPartitionCountersTest(bool isFirstClass) { auto& counterData = meta.GetCounterData(); UNIT_ASSERT_VALUES_EQUAL(counterData.GetMessagesWrittenTotal(), cookie - 1); UNIT_ASSERT_VALUES_EQUAL(counterData.GetMessagesWrittenGrpc(),isFirstClass ? cookie - 1 : 0); - UNIT_ASSERT(counterData.GetBytesWrittenUncompressed() > currUncSize); + UNIT_ASSERT(counterData.GetBytesWrittenUncompressed() >= currUncSize); currUncSize = counterData.GetBytesWrittenUncompressed(); UNIT_ASSERT_VALUES_EQUAL(counterData.GetBytesWrittenGrpc(), isFirstClass ? counterData.GetBytesWrittenTotal() : 0); - UNIT_ASSERT(counterData.GetBytesWrittenTotal() > currTotalSize); + UNIT_ASSERT(counterData.GetBytesWrittenTotal() >= currTotalSize); currTotalSize = counterData.GetBytesWrittenTotal(); if (cookie == 11) { diff --git a/ydb/core/persqueue/ut/ya.make b/ydb/core/persqueue/ut/ya.make index a2c6314bfe..54749543f8 100644 --- a/ydb/core/persqueue/ut/ya.make +++ b/ydb/core/persqueue/ut/ya.make @@ -49,6 +49,7 @@ SRCS( fetch_request_ut.cpp utils_ut.cpp list_all_topics_ut.cpp + cache_eviction_ut.cpp ) RESOURCE( diff --git a/ydb/core/testlib/test_pq_client.h b/ydb/core/testlib/test_pq_client.h index 48608698f4..a04a1f9aae 100644 --- a/ydb/core/testlib/test_pq_client.h +++ b/ydb/core/testlib/test_pq_client.h @@ -5,6 +5,7 @@ #include <ydb/core/persqueue/cluster_tracker.h> #include <ydb/core/protos/flat_tx_scheme.pb.h> #include <ydb/core/mind/address_classification/net_classifier.h> +#include <ydb/core/keyvalue/keyvalue_events.h> #include <ydb/public/api/protos/draft/persqueue_error_codes.pb.h> #include <ydb/public/lib/deprecated/kicli/kicli.h> #include <ydb-cpp-sdk/client/driver/driver.h> @@ -860,6 +861,44 @@ public: } } + TVector<TString> GetPQTabletKeys(TTestActorRuntime* runtime, const TString& topic, ui32 partitionId) { + ui64 tabletId = Max<ui64>(); + + auto res = Ls("/Root/PQ/" + topic); + const auto& pq = res->Record.GetPathDescription().GetPersQueueGroup(); + for (ui32 i = 0; i < pq.PartitionsSize(); ++i) { + const auto& partition = pq.GetPartitions(i); + if (partition.GetPartitionId() == partitionId) { + tabletId = partition.GetTabletId(); + break; + } + } + + Y_ABORT_UNLESS(tabletId != Max<ui64>()); + + auto request = MakeHolder<TEvKeyValue::TEvRequest>(); + auto* cmd = request->Record.AddCmdReadRange(); + auto* range = cmd->MutableRange(); + range->SetFrom("\x00"); + range->SetTo("\xFF"); + + TActorId sender = runtime->AllocateEdgeActor(); + ForwardToTablet(*runtime, tabletId, sender, request.Release(), 0); + auto response = runtime->GrabEdgeEvent<TEvKeyValue::TEvResponse>(); + + TVector<TString> keys; + + for (size_t i = 0; i < response->Record.ReadRangeResultSize(); ++i) { + const auto& result = response->Record.GetReadRangeResult(i); + for (size_t j = 0; j < result.PairSize(); ++j) { + const auto& pair = result.GetPair(j); + keys.push_back(pair.GetKey()); + } + } + + return keys; + } + bool IsTopicDeleted(const TString& name) { auto response = RequestTopicMetadata(name); diff --git a/ydb/public/sdk/cpp/src/client/topic/ut/topic_to_table_ut.cpp b/ydb/public/sdk/cpp/src/client/topic/ut/topic_to_table_ut.cpp index 776e353cc1..a057b113f8 100644 --- a/ydb/public/sdk/cpp/src/client/topic/ut/topic_to_table_ut.cpp +++ b/ydb/public/sdk/cpp/src/client/topic/ut/topic_to_table_ut.cpp @@ -9,6 +9,7 @@ #include <ydb/core/persqueue/key.h> #include <ydb/core/persqueue/blob.h> #include <ydb/core/persqueue/events/global.h> +#include <ydb/core/persqueue/pq_l2_service.h> #include <ydb/core/tx/long_tx_service/public/events.h> #include <ydb/core/persqueue/ut/common/autoscaling_ut_common.h> @@ -237,6 +238,8 @@ protected: virtual bool GetEnableHtapTx() const; virtual bool GetAllowOlapDataQuery() const; + size_t GetPQCacheRenameKeysCount(); + private: template<class E> E ReadEvent(TTopicReadSessionPtr reader, NTable::TTransaction& tx); @@ -248,6 +251,8 @@ private: ui32 partition); std::vector<std::string> GetTabletKeys(const TActorId& actorId, ui64 tabletId); + std::vector<std::string> GetPQTabletDataKeys(const TActorId& actorId, + ui64 tabletId); NPQ::TWriteId GetTransactionWriteId(const TActorId& actorId, ui64 tabletId); void SendLongTxLockStatus(const TActorId& actorId, @@ -1084,6 +1089,43 @@ std::vector<std::string> TFixture::GetTabletKeys(const TActorId& actorId, return keys; } +std::vector<std::string> TFixture::GetPQTabletDataKeys(const TActorId& actorId, + ui64 tabletId) +{ + using namespace NKikimr::NPQ; + + std::vector<std::string> keys; + + for (const auto& key : GetTabletKeys(actorId, tabletId)) { + if (key.empty() || + ((std::tolower(key.front()) != TKeyPrefix::TypeData) && + (std::tolower(key.front()) != TKeyPrefix::TypeTmpData))) { + continue; + } + + keys.push_back(key); + } + + return keys; +} + +size_t TFixture::GetPQCacheRenameKeysCount() +{ + using namespace NKikimr::NPQ; + + auto& runtime = Setup->GetRuntime(); + TActorId edge = runtime.AllocateEdgeActor(); + + auto request = MakeHolder<TEvPqCache::TEvCacheKeysRequest>(); + + runtime.Send(MakePersQueueL2CacheID(), edge, request.Release()); + + TAutoPtr<IEventHandle> handle; + auto* result = runtime.GrabEdgeEvent<TEvPqCache::TEvCacheKeysResponse>(handle); + + return result->RenamedKeys; +} + void TFixture::RestartLongTxService() { auto& runtime = Setup->GetRuntime(); @@ -2579,6 +2621,55 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_48, TFixture) UNIT_ASSERT_GT(topicDescription.GetTotalPartitionsCount(), 2); } +Y_UNIT_TEST_F(WriteToTopic_Demo_50, TFixture) +{ + // We write to the topic in the transaction. When a transaction is committed, the keys in the blob + // cache are renamed. + CreateTopic("topic_A", TEST_CONSUMER); + CreateTopic("topic_B", TEST_CONSUMER); + + TString message(128_KB, 'x'); + + WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID_1, message); + WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID_1); + + auto session = CreateTableSession(); + + // tx #1 + // After the transaction commit, there will be no large blobs in the batches. The number of renames + // will not change in the cache. + auto tx = BeginTx(session); + + WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID_2, message, &tx); + WriteToTopic("topic_B", TEST_MESSAGE_GROUP_ID_3, message, &tx); + + UNIT_ASSERT_VALUES_EQUAL(GetPQCacheRenameKeysCount(), 0); + + CommitTx(tx, EStatus::SUCCESS); + + Sleep(TDuration::Seconds(5)); + + UNIT_ASSERT_VALUES_EQUAL(GetPQCacheRenameKeysCount(), 0); + + // tx #2 + // After the commit, the party will rename one big blob + tx = BeginTx(session); + + for (unsigned i = 0; i < 80; ++i) { + WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID_2, message, &tx); + } + + WriteToTopic("topic_B", TEST_MESSAGE_GROUP_ID_3, message, &tx); + + UNIT_ASSERT_VALUES_EQUAL(GetPQCacheRenameKeysCount(), 0); + + CommitTx(tx, EStatus::SUCCESS); + + Sleep(TDuration::Seconds(5)); + + UNIT_ASSERT_VALUES_EQUAL(GetPQCacheRenameKeysCount(), 1); +} + class TFixtureSinks : public TFixture { protected: void CreateRowTable(const TString& path); diff --git a/ydb/services/persqueue_v1/persqueue_ut.cpp b/ydb/services/persqueue_v1/persqueue_ut.cpp index 1b2fefc976..81ab1f45f9 100644 --- a/ydb/services/persqueue_v1/persqueue_ut.cpp +++ b/ydb/services/persqueue_v1/persqueue_ut.cpp @@ -2839,10 +2839,10 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) { ui32 fromDisk = info0.BlobsFromDisk + info16.BlobsFromDisk; ui32 fromCache = info0.BlobsFromCache + info16.BlobsFromCache; - UNIT_ASSERT(fromDisk > 0); - UNIT_ASSERT(fromDisk < 5); - UNIT_ASSERT(fromCache > 0); - UNIT_ASSERT(fromCache < 5); + UNIT_ASSERT_GE(fromDisk, 0); + UNIT_ASSERT_LE(fromDisk, 6); + UNIT_ASSERT_GT(fromCache, 0); + UNIT_ASSERT_LE(fromCache, 6); } Y_UNIT_TEST(CacheHead) { |