aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAlek5andr-Kotov <akotov@ydb.tech>2025-02-10 20:05:47 +0300
committerGitHub <noreply@github.com>2025-02-10 20:05:47 +0300
commitd368f28d37310c61daf28813f016dc6f9bfafecd (patch)
tree86e0aee3f96d69106191d4cca95e54da79180bca
parentad3a69baf9c00f3dc48aa33bf820439c0663e7ff (diff)
downloadydb-d368f28d37310c61daf28813f016dc6f9bfafecd.tar.gz
The keys in the block cache (#13553)
-rw-r--r--ydb/core/persqueue/cache_eviction.h127
-rw-r--r--ydb/core/persqueue/map_subrange.h24
-rw-r--r--ydb/core/persqueue/partition.cpp12
-rw-r--r--ydb/core/persqueue/partition.h2
-rw-r--r--ydb/core/persqueue/partition_id.h15
-rw-r--r--ydb/core/persqueue/partition_write.cpp4
-rw-r--r--ydb/core/persqueue/pq_l2_cache.cpp34
-rw-r--r--ydb/core/persqueue/pq_l2_cache.h7
-rw-r--r--ydb/core/persqueue/pq_l2_service.h11
-rw-r--r--ydb/core/persqueue/read.h70
-rw-r--r--ydb/core/persqueue/ut/cache_eviction_ut.cpp177
-rw-r--r--ydb/core/persqueue/ut/partition_ut.cpp4
-rw-r--r--ydb/core/persqueue/ut/ya.make1
-rw-r--r--ydb/core/testlib/test_pq_client.h39
-rw-r--r--ydb/public/sdk/cpp/src/client/topic/ut/topic_to_table_ut.cpp91
-rw-r--r--ydb/services/persqueue_v1/persqueue_ut.cpp8
16 files changed, 588 insertions, 38 deletions
diff --git a/ydb/core/persqueue/cache_eviction.h b/ydb/core/persqueue/cache_eviction.h
index 8826101d40..6826fb8164 100644
--- a/ydb/core/persqueue/cache_eviction.h
+++ b/ydb/core/persqueue/cache_eviction.h
@@ -5,6 +5,7 @@
#include <ydb/core/base/appdata.h>
#include <ydb/core/persqueue/events/internal.h>
+#include <ydb/core/persqueue/map_subrange.h>
namespace NKikimr {
namespace NPQ {
@@ -25,10 +26,18 @@ namespace NPQ {
{
}
- bool operator == (const TBlobId& r) const {
+ bool operator==(const TBlobId& r) const {
return Partition.IsEqual(r.Partition) && Offset == r.Offset && PartNo == r.PartNo;
}
+ bool operator<(const TBlobId& r) const {
+ auto makeTuple = [](const TBlobId& v) {
+ return std::make_tuple(v.Partition, v.Offset, v.PartNo, v.Count, v.InternalPartsCount);
+ };
+
+ return makeTuple(*this) < makeTuple(r);
+ }
+
ui64 Hash() const {
return Hash128to32((ui64(Partition.InternalPartitionId) << 17) + (Partition.IsSupportivePartition() ? 0 : (1 << 16)) + PartNo, Offset);
}
@@ -51,12 +60,26 @@ namespace NPQ {
TypeWrite
};
+ struct TDeleteBlobRange {
+ TString Begin;
+ bool IncludeBegin;
+ TString End;
+ bool IncludeEnd;
+ };
+
+ struct TRenameBlob {
+ TString From;
+ TString To;
+ };
+
ERequestType Type;
TActorId Sender;
ui64 CookiePQ;
TPartitionId Partition;
ui32 MetadataWritesCount;
TVector<TRequestedBlob> Blobs;
+ TVector<TDeleteBlobRange> DeletedBlobs;
+ TVector<TRenameBlob> RenamedBlobs;
TKvRequest(ERequestType type, TActorId sender, ui64 cookie, const TPartitionId& partition)
: Type(type)
@@ -175,13 +198,13 @@ namespace NPQ {
, Source(SourcePrefetch)
{}
- const TCacheValue::TPtr GetBlob() const { return Blob.lock(); }
+ const TCacheValue::TPtr GetBlob() const { return Blob; }
private:
- TCacheValue::TWeakPtr Blob;
+ TCacheValue::TPtr Blob;
};
- using TMapType = THashMap<TBlobId, TValueL1>;
+ using TMapType = TMap<TBlobId, TValueL1>;
struct TCounters {
ui64 SizeBytes = 0;
@@ -251,30 +274,90 @@ namespace NPQ {
{
auto reqData = MakeHolder<TCacheL2Request>(TabletId);
+ DeleteBlobs(kvReq, *reqData, ctx);
+ RenameBlobs(kvReq, *reqData, ctx);
+ SaveBlobs(kvReq, *reqData, ctx);
+
+ auto l2Request = MakeHolder<TEvPqCache::TEvCacheL2Request>(reqData.Release());
+ ctx.Send(MakePersQueueL2CacheID(), l2Request.Release()); // -> L2
+ }
+
+ void SaveBlobs(const TKvRequest& kvReq, TCacheL2Request& reqData, const TActorContext& ctx)
+ {
for (const TRequestedBlob& reqBlob : kvReq.Blobs) {
TBlobId blob(kvReq.Partition, reqBlob.Offset, reqBlob.PartNo, reqBlob.Count, reqBlob.InternalPartsCount);
- { // there could be a new blob with same id (for big messages)
- if (RemoveExists(ctx, blob)) {
- reqData->RemovedBlobs.emplace_back(kvReq.Partition, reqBlob.Offset, reqBlob.PartNo, nullptr);
- }
+
+ // there could be a new blob with same id (for big messages)
+ if (RemoveExists(ctx, blob)) {
+ reqData.RemovedBlobs.emplace_back(kvReq.Partition, reqBlob.Offset, reqBlob.PartNo, nullptr);
}
- TCacheValue::TPtr cached(new TCacheValue(reqBlob.Value, ctx.SelfID, TAppData::TimeProvider->Now()));
+ auto cached = std::make_shared<TCacheValue>(reqBlob.Value, ctx.SelfID, TAppData::TimeProvider->Now());
TValueL1 valL1(cached, cached->DataSize(), TValueL1::SourceHead);
Cache[blob] = valL1; // weak
Counters.Inc(valL1);
if (L1Strategy)
L1Strategy->SaveHeadBlob(blob);
- reqData->StoredBlobs.emplace_back(kvReq.Partition, reqBlob.Offset, reqBlob.PartNo, cached);
+ reqData.StoredBlobs.emplace_back(kvReq.Partition, reqBlob.Offset, reqBlob.PartNo, cached);
LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "Caching head blob in L1. Partition "
<< blob.Partition << " offset " << blob.Offset << " count " << blob.Count
<< " size " << reqBlob.Value.size() << " actorID " << ctx.SelfID);
}
+ }
- auto l2Request = MakeHolder<TEvPqCache::TEvCacheL2Request>(reqData.Release());
- ctx.Send(MakePersQueueL2CacheID(), l2Request.Release()); // -> L2
+ TBlobId MakeBlobId(const TString& s)
+ {
+ if (s.length() == TKeyPrefix::MarkPosition()) {
+ TPartitionId partitionId;
+ partitionId.OriginalPartitionId = FromString<ui32>(s.data() + 1, 10);
+ partitionId.InternalPartitionId = partitionId.OriginalPartitionId;
+ return {partitionId, 0, 0, 0, 0};
+ } else {
+ TKey key(s);
+ return {key.GetPartition(), key.GetOffset(), key.GetPartNo(), key.GetCount(), key.GetInternalPartsCount()};
+ }
+ }
+
+ void RenameBlobs(const TKvRequest& kvReq, TCacheL2Request& reqData, const TActorContext& ctx)
+ {
+ for (const auto& [oldKey, newKey] : kvReq.RenamedBlobs) {
+ TBlobId oldBlob = MakeBlobId(oldKey);
+ TBlobId newBlob = MakeBlobId(newKey);
+ if (RenameExists(ctx, oldBlob, newBlob)) {
+ reqData.RenamedBlobs.emplace_back(std::piecewise_construct,
+ std::make_tuple(oldBlob.Partition, oldBlob.Offset, oldBlob.PartNo, nullptr),
+ std::make_tuple(newBlob.Partition, newBlob.Offset, newBlob.PartNo, nullptr));
+
+ LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "Renaming head blob in L1. Old partition "
+ << oldBlob.Partition << " old offset " << oldBlob.Offset << " old count " << oldBlob.Count
+ << " new partition " << newBlob.Partition << " new offset " << newBlob.Offset << " new count " << newBlob.Count
+ << " actorID " << ctx.SelfID);
+ }
+ }
+ }
+
+ void DeleteBlobs(const TKvRequest& kvReq, TCacheL2Request& reqData, const TActorContext& ctx)
+ {
+ for (const auto& range : kvReq.DeletedBlobs) {
+ auto [lowerBound, upperBound] = MapSubrange(Cache,
+ MakeBlobId(range.Begin), range.IncludeBegin,
+ MakeBlobId(range.End), range.IncludeEnd);
+
+ for (auto i = lowerBound; i != upperBound; ++i) {
+ const auto& [blob, value] = *i;
+
+ reqData.RemovedBlobs.emplace_back(blob.Partition, blob.Offset, blob.PartNo, nullptr);
+ Counters.Dec(value);
+
+ LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "Deleting head blob in L1. Partition "
+ << blob.Partition << " offset " << blob.Offset << " count " << blob.Count
+ << " actorID " << ctx.SelfID);
+ }
+
+ Cache.erase(lowerBound, upperBound);
+ }
}
void SavePrefetchBlobs(const TActorContext& ctx, const TKvRequest& kvReq, const TVector<bool>& store)
@@ -461,6 +544,26 @@ namespace NPQ {
TValueL1 value;
return CheckExists(ctx, blob, value, true);
}
+
+ bool RenameExists(const TActorContext& ctx, const TBlobId& oldBlob, const TBlobId& newBlob)
+ {
+ Y_UNUSED(ctx);
+
+ auto it = Cache.find(oldBlob);
+ if (it == Cache.end()) {
+ return false;
+ }
+
+ TValueL1 value = it->second;
+ Cache.erase(it);
+
+ Cache[newBlob] = value;
+ Counters.Inc(value);
+ if (L1Strategy)
+ L1Strategy->SaveHeadBlob(newBlob);
+
+ return true;
+ }
};
} //NPQ
diff --git a/ydb/core/persqueue/map_subrange.h b/ydb/core/persqueue/map_subrange.h
new file mode 100644
index 0000000000..1ab83f0076
--- /dev/null
+++ b/ydb/core/persqueue/map_subrange.h
@@ -0,0 +1,24 @@
+namespace NKikimr::NPQ {
+
+template<class M, class I = typename M::const_iterator>
+std::pair<I, I> MapSubrange(const M& map,
+ const typename M::key_type& begin, bool includeBegin,
+ const typename M::key_type& end, bool includeEnd)
+{
+ if (map.empty()) {
+ return {map.end(), map.end()};
+ }
+
+ if (begin == end) {
+ if ((includeBegin != includeEnd) || !includeBegin) {
+ return {map.end(), map.end()};
+ }
+ }
+
+ auto leftBorder = includeBegin ? map.lower_bound(begin) : map.upper_bound(begin);
+ auto rightBorder = includeEnd ? map.upper_bound(end) : map.lower_bound(end);
+
+ return {leftBorder, rightBorder};
+}
+
+}
diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp
index b07e9e40a9..657402d367 100644
--- a/ydb/core/persqueue/partition.cpp
+++ b/ydb/core/persqueue/partition.cpp
@@ -1827,7 +1827,7 @@ void TPartition::ProcessTxsAndUserActs(const TActorContext& ctx)
AddCmdDeleteRangeForAllKeys(*PersistRequest);
- ctx.Send(Tablet, PersistRequest.Release(), 0, 0, PersistRequestSpan.GetTraceId());
+ ctx.Send(BlobCache, PersistRequest.Release(), 0, 0, PersistRequestSpan.GetTraceId());
PersistRequest = nullptr;
CurrentPersistRequestSpan = std::move(PersistRequestSpan);
PersistRequestSpan = NWilson::TSpan();
@@ -1993,7 +1993,9 @@ void TPartition::RunPersist() {
//haveChanges = true;
}
- TryAddDeleteHeadKeysToPersistRequest();
+ if (TryAddDeleteHeadKeysToPersistRequest()) {
+ haveChanges = true;
+ }
if (haveChanges || TxIdHasChanged || !AffectedUsers.empty() || ChangeConfig) {
WriteCycleStartTime = now;
@@ -2055,8 +2057,10 @@ void TPartition::RunPersist() {
PersistRequest = nullptr;
}
-void TPartition::TryAddDeleteHeadKeysToPersistRequest()
+bool TPartition::TryAddDeleteHeadKeysToPersistRequest()
{
+ bool haveChanges = !DeletedKeys.empty();
+
while (!DeletedKeys.empty()) {
auto& k = DeletedKeys.back();
@@ -2070,6 +2074,8 @@ void TPartition::TryAddDeleteHeadKeysToPersistRequest()
DeletedKeys.pop_back();
}
+
+ return haveChanges;
}
void TPartition::DumpKeyValueRequest(const NKikimrClient::TKeyValueRequest& request)
diff --git a/ydb/core/persqueue/partition.h b/ydb/core/persqueue/partition.h
index 9ecc0bac0e..197ef34f61 100644
--- a/ydb/core/persqueue/partition.h
+++ b/ydb/core/persqueue/partition.h
@@ -982,7 +982,7 @@ private:
size_t WriteNewSizeFromSupportivePartitions = 0;
- void TryAddDeleteHeadKeysToPersistRequest();
+ bool TryAddDeleteHeadKeysToPersistRequest();
void DumpKeyValueRequest(const NKikimrClient::TKeyValueRequest& request);
TBlobKeyTokenPtr MakeBlobKeyToken(const TString& key);
diff --git a/ydb/core/persqueue/partition_id.h b/ydb/core/persqueue/partition_id.h
index 0c1dbb8d3a..1582a71592 100644
--- a/ydb/core/persqueue/partition_id.h
+++ b/ydb/core/persqueue/partition_id.h
@@ -43,6 +43,15 @@ public:
(InternalPartitionId == rhs.InternalPartitionId);
}
+ bool IsLess(const TPartitionId& rhs) const
+ {
+ auto makeTuple = [](const TPartitionId& v) {
+ return std::make_tuple(v.OriginalPartitionId, v.WriteId, v.InternalPartitionId);
+ };
+
+ return makeTuple(*this) < makeTuple(rhs);
+ }
+
void ToStream(IOutputStream& s) const
{
if (WriteId.Defined()) {
@@ -76,6 +85,12 @@ bool operator==(const TPartitionId& lhs, const TPartitionId& rhs)
}
inline
+bool operator<(const TPartitionId& lhs, const TPartitionId& rhs)
+{
+ return lhs.IsLess(rhs);
+}
+
+inline
IOutputStream& operator<<(IOutputStream& s, const TPartitionId& v)
{
v.ToStream(s);
diff --git a/ydb/core/persqueue/partition_write.cpp b/ydb/core/persqueue/partition_write.cpp
index fce4a23606..c590b3b6e4 100644
--- a/ydb/core/persqueue/partition_write.cpp
+++ b/ydb/core/persqueue/partition_write.cpp
@@ -1069,7 +1069,6 @@ void TPartition::AddCmdWrite(const std::optional<TPartitionedBlob::TFormedBlobIn
TKey resKey = newWrite->Key;
resKey.SetType(TKeyPrefix::TypeData);
- write->SetKeyToCache(resKey.ToString());
WriteCycleSize += newWrite->Value.size();
}
@@ -1486,9 +1485,6 @@ void TPartition::AddNewWriteBlob(std::pair<TKey, ui32>& res, TEvKeyValue::TEvReq
write->SetKey(key.Data(), key.Size());
write->SetValue(valueD);
- if (!key.IsHead())
- write->SetKeyToCache(key.Data(), key.Size());
-
bool isInline = key.IsHead() && valueD.size() < MAX_INLINE_SIZE;
if (isInline)
diff --git a/ydb/core/persqueue/pq_l2_cache.cpp b/ydb/core/persqueue/pq_l2_cache.cpp
index 404f45dbaa..d8ca1cb39e 100644
--- a/ydb/core/persqueue/pq_l2_cache.cpp
+++ b/ydb/core/persqueue/pq_l2_cache.cpp
@@ -34,6 +34,7 @@ void TPersQueueCacheL2::Handle(TEvPqCache::TEvCacheL2Request::TPtr& ev, const TA
TouchBlobs(ctx, tabletId, request->ExpectedBlobs, false);
RemoveBlobs(ctx, tabletId, request->RemovedBlobs);
RegretBlobs(ctx, tabletId, request->MissedBlobs);
+ RenameBlobs(ctx, tabletId, request->RenamedBlobs);
THashMap<TKey, TCacheValue::TPtr> evicted;
AddBlobs(ctx, tabletId, request->StoredBlobs, evicted);
@@ -46,7 +47,7 @@ void TPersQueueCacheL2::SendResponses(const TActorContext& ctx, const THashMap<T
TInstant now = TAppData::TimeProvider->Now();
THashMap<TActorId, THolder<TCacheL2Response>> responses;
- for (auto rm : evictedBlobs) {
+ for (const auto& rm : evictedBlobs) {
const TKey& key = rm.first;
TCacheValue::TPtr evicted = rm.second;
@@ -57,7 +58,7 @@ void TPersQueueCacheL2::SendResponses(const TActorContext& ctx, const THashMap<T
}
Y_ABORT_UNLESS(key.TabletId == resp->TabletId, "PQ L2. Multiple topics in one PQ tablet.");
- resp->Removed.push_back({key.Partition, key.Offset, key.PartNo, evicted});
+ resp->Removed.emplace_back(key.Partition, key.Offset, key.PartNo, evicted);
RetentionTime = now - evicted->GetAccessTime();
if (RetentionTime < KeepTime)
@@ -72,6 +73,13 @@ void TPersQueueCacheL2::SendResponses(const TActorContext& ctx, const THashMap<T
}
}
+void TPersQueueCacheL2::Handle(TEvPqCache::TEvCacheKeysRequest::TPtr& ev, const TActorContext& ctx)
+{
+ auto response = MakeHolder<TEvPqCache::TEvCacheKeysResponse>();
+ response->RenamedKeys = RenamedKeys;
+ ctx.Send(ev->Sender, response.Release());
+}
+
/// @return outRemoved - map of evicted items. L1 should be noticed about them
void TPersQueueCacheL2::AddBlobs(const TActorContext& ctx, ui64 tabletId, const TVector<TCacheBlobL2>& blobs,
THashMap<TKey, TCacheValue::TPtr>& outEvicted)
@@ -156,6 +164,28 @@ void TPersQueueCacheL2::RemoveBlobs(const TActorContext& ctx, ui64 tabletId, con
}
}
+void TPersQueueCacheL2::RenameBlobs(const TActorContext& ctx, ui64 tabletId,
+ const TVector<std::pair<TCacheBlobL2, TCacheBlobL2>>& blobs)
+{
+ RenamedKeys += blobs.size();
+
+ for (const auto& [oldBlob, newBlob] : blobs) {
+ TKey oldKey(tabletId, oldBlob);
+ auto it = Cache.FindWithoutPromote(oldKey);
+ if (it == Cache.End()) {
+ continue;
+ }
+
+ TKey newKey(tabletId, newBlob);
+ Cache.Insert(newKey, *it);
+ Cache.Erase(it);
+
+ LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "PQ Cache (L2). Renamed. Tablet '" << tabletId
+ << "' old partition " << oldBlob.Partition << " old offset " << oldBlob.Offset
+ << " new partition " << newBlob.Partition << " new offset " << newBlob.Offset);
+ }
+}
+
void TPersQueueCacheL2::TouchBlobs(const TActorContext& ctx, ui64 tabletId, const TVector<TCacheBlobL2>& blobs, bool isHit)
{
TInstant now = TAppData::TimeProvider->Now();
diff --git a/ydb/core/persqueue/pq_l2_cache.h b/ydb/core/persqueue/pq_l2_cache.h
index 42aa184814..3747da4d1e 100644
--- a/ydb/core/persqueue/pq_l2_cache.h
+++ b/ydb/core/persqueue/pq_l2_cache.h
@@ -93,6 +93,7 @@ private:
switch (ev->GetTypeRewrite()) {
HFuncTraced(TEvents::TEvPoisonPill, Handle);
HFuncTraced(TEvPqCache::TEvCacheL2Request, Handle);
+ HFuncTraced(TEvPqCache::TEvCacheKeysRequest, Handle);
HFuncTraced(NMon::TEvHttpInfo, Handle);
default:
break;
@@ -110,11 +111,15 @@ private:
void Handle(TEvPqCache::TEvCacheL2Request::TPtr& ev, const TActorContext& ctx);
void SendResponses(const TActorContext& ctx, const THashMap<TKey, TCacheValue::TPtr>& evicted);
+ void Handle(TEvPqCache::TEvCacheKeysRequest::TPtr& ev, const TActorContext& ctx);
+
void AddBlobs(const TActorContext& ctx, ui64 tabletId, const TVector<TCacheBlobL2>& blobs,
THashMap<TKey, TCacheValue::TPtr>& outEvicted);
void RemoveBlobs(const TActorContext& ctx, ui64 tabletId, const TVector<TCacheBlobL2>& blobs);
void TouchBlobs(const TActorContext& ctx, ui64 tabletId, const TVector<TCacheBlobL2>& blobs, bool isHit = true);
void RegretBlobs(const TActorContext& ctx, ui64 tabletId, const TVector<TCacheBlobL2>& blobs);
+ void RenameBlobs(const TActorContext& ctx, ui64 tabletId,
+ const TVector<std::pair<TCacheBlobL2, TCacheBlobL2>>& blobs);
static ui64 ClampMinSize(ui64 maxSize) {
static const ui64 MIN_SIZE = 32_MB;
@@ -130,6 +135,8 @@ private:
TL2Counters Counters;
TString HttpForm() const;
+
+ size_t RenamedKeys = 0;
};
} // NPQ
diff --git a/ydb/core/persqueue/pq_l2_service.h b/ydb/core/persqueue/pq_l2_service.h
index d48f303eec..a02b2d9bfb 100644
--- a/ydb/core/persqueue/pq_l2_service.h
+++ b/ydb/core/persqueue/pq_l2_service.h
@@ -83,6 +83,7 @@ struct TCacheL2Request {
TVector<TCacheBlobL2> RemovedBlobs;
TVector<TCacheBlobL2> ExpectedBlobs;
TVector<TCacheBlobL2> MissedBlobs;
+ TVector<std::pair<TCacheBlobL2, TCacheBlobL2>> RenamedBlobs;
explicit TCacheL2Request(ui64 tabletId)
: TabletId(tabletId)
@@ -102,7 +103,8 @@ struct TEvPqCache {
enum EEv {
EvCacheRequest = EventSpaceBegin(TKikimrEvents::ES_PQ_L2_CACHE),
EvCacheResponse,
-
+ EvCacheKeysRequest,
+ EvCacheKeysResponse,
EvEnd
};
@@ -123,6 +125,13 @@ struct TEvPqCache {
: Data(data)
{}
};
+
+ struct TEvCacheKeysRequest : TEventLocal<TEvCacheKeysRequest, EvCacheKeysRequest> {
+ };
+
+ struct TEvCacheKeysResponse : TEventLocal<TEvCacheKeysResponse, EvCacheKeysResponse> {
+ size_t RenamedKeys = 0;
+ };
};
} // NPQ
diff --git a/ydb/core/persqueue/read.h b/ydb/core/persqueue/read.h
index 54986321af..365220586a 100644
--- a/ydb/core/persqueue/read.h
+++ b/ydb/core/persqueue/read.h
@@ -264,15 +264,32 @@ namespace NPQ {
auto& srcRequest = ev->Get()->Record;
TKvRequest kvReq(TKvRequest::TypeWrite, ev->Sender, Max<ui64>(), TPartitionId(Max<ui32>()));
+
+ SaveCmdWrite(srcRequest, kvReq, ctx);
+ SaveCmdRename(srcRequest, kvReq, ctx);
+ SaveCmdDelete(srcRequest, kvReq, ctx);
+
+ ui64 cookie = SaveKvRequest(std::move(kvReq));
+
+ auto request = MakeHolder<TEvKeyValue::TEvRequest>();
+ request->Record = std::move(srcRequest);
+ request->Record.SetCookie(cookie);
+
+ ctx.Send(Tablet, request.Release(), 0, 0, std::move(ev->TraceId)); // -> KV
+ }
+
+ void SaveCmdWrite(const NKikimrClient::TKeyValueRequest& srcRequest, TKvRequest& kvReq, const TActorContext& ctx)
+ {
kvReq.Blobs.reserve(srcRequest.CmdWriteSize());
for (ui32 i = 0; i < srcRequest.CmdWriteSize(); ++i) {
const auto& cmd = srcRequest.GetCmdWrite(i);
- if (cmd.HasKeyToCache()) {
- const TString& strKey = cmd.GetKeyToCache();
- Y_ABORT_UNLESS(strKey.size() == TKey::KeySize(), "Unexpected key size: %" PRIu64, strKey.size());
+ const TString& strKey = cmd.GetKey();
+ if (IsDataKey(strKey)) {
+ Y_ABORT_UNLESS((strKey.size() >= TKey::KeySize()) && (strKey.size() - TKey::KeySize() <= 1),
+ "Unexpected key size: %" PRIu64 " (%s)",
+ strKey.size(), strKey.data());
TKey key(strKey);
- Y_ABORT_UNLESS(!key.IsHead());
const TString& value = cmd.GetValue();
kvReq.Partition = key.GetPartition();
@@ -285,14 +302,49 @@ namespace NPQ {
kvReq.MetadataWritesCount++;
}
}
+ }
- ui64 cookie = SaveKvRequest(std::move(kvReq));
+ void SaveCmdRename(const NKikimrClient::TKeyValueRequest& srcRequest, TKvRequest& kvReq, const TActorContext& ctx)
+ {
+ kvReq.RenamedBlobs.reserve(srcRequest.CmdRenameSize());
- auto request = MakeHolder<TEvKeyValue::TEvRequest>();
- request->Record = std::move(srcRequest);
- request->Record.SetCookie(cookie);
+ for (ui32 i = 0; i < srcRequest.CmdRenameSize(); ++i) {
+ const auto& cmd = srcRequest.GetCmdRename(i);
+ if (!IsDataKey(cmd.GetOldKey()) || !IsDataKey(cmd.GetNewKey())) {
+ continue;
+ }
+ kvReq.RenamedBlobs.emplace_back(cmd.GetOldKey(), cmd.GetNewKey());
- ctx.Send(Tablet, request.Release(), 0, 0, std::move(ev->TraceId)); // -> KV
+ LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "CacheProxy. Rename blob from " << cmd.GetOldKey() << " to " << cmd.GetNewKey());
+ }
+ }
+
+ void SaveCmdDelete(const NKikimrClient::TKeyValueRequest& srcRequest, TKvRequest& kvReq, const TActorContext& ctx)
+ {
+ kvReq.DeletedBlobs.reserve(srcRequest.CmdDeleteRangeSize());
+
+ for (ui32 i = 0; i < srcRequest.CmdDeleteRangeSize(); ++i) {
+ const auto& cmd = srcRequest.GetCmdDeleteRange(i);
+ const auto& range = cmd.GetRange();
+ if (!IsDataKey(range.GetFrom()) || !IsDataKey(range.GetTo())) {
+ continue;
+ }
+ kvReq.DeletedBlobs.emplace_back(range.GetFrom(), range.GetIncludeFrom(),
+ range.GetTo(), range.GetIncludeTo());
+
+ LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE,
+ "CacheProxy. Delete blobs from " <<
+ range.GetFrom() << "(" << (range.GetIncludeFrom() ? '+' : '-') << ") to " <<
+ range.GetTo() << "(" << (range.GetIncludeTo() ? '+' : '-') << ")");
+ }
+ }
+
+ bool IsDataKey(const TString& key) const {
+ if (key.empty()) {
+ return false;
+ }
+ const char type = std::tolower(key.front());
+ return type == TKeyPrefix::TypeData; // TypeData || ServiceTypeData
}
void Handle(TEvPqCache::TEvCacheL2Response::TPtr& ev, const TActorContext& ctx)
diff --git a/ydb/core/persqueue/ut/cache_eviction_ut.cpp b/ydb/core/persqueue/ut/cache_eviction_ut.cpp
new file mode 100644
index 0000000000..77b6273723
--- /dev/null
+++ b/ydb/core/persqueue/ut/cache_eviction_ut.cpp
@@ -0,0 +1,177 @@
+#include <library/cpp/testing/unittest/registar.h>
+#include <ydb/core/persqueue/map_subrange.h>
+
+Y_UNIT_TEST_SUITE(CacheEviction) {
+
+template<class K>
+struct TKeyRange {
+ K Begin;
+ bool IncludeBegin;
+ K End;
+ bool IncludeEnd;
+};
+
+template<class K>
+TKeyRange<K> OpenOpen(K begin, K end)
+{
+ return {.Begin=begin, .IncludeBegin=false, .End=end, .IncludeEnd=false};
+}
+
+template<class K>
+TKeyRange<K> OpenClose(K begin, K end)
+{
+ return {.Begin=begin, .IncludeBegin=false, .End=end, .IncludeEnd=true};
+}
+
+template<class K>
+TKeyRange<K> CloseOpen(K begin, K end)
+{
+ return {.Begin=begin, .IncludeBegin=true, .End=end, .IncludeEnd=false};
+}
+
+template<class K>
+TKeyRange<K> CloseClose(K begin, K end)
+{
+ return {.Begin=begin, .IncludeBegin=true, .End=end, .IncludeEnd=true};
+}
+
+void CheckDeleteKeys(const TMap<int, int>& map, const TKeyRange<int>& range, const TSet<int>& expectedKeys)
+{
+ auto [lowerBound, upperBound] =
+ NKikimr::NPQ::MapSubrange(map,
+ range.Begin, range.IncludeBegin,
+ range.End, range.IncludeEnd);
+
+ TSet<int> remainKeys;
+ for (const auto& [key, _] : map) {
+ remainKeys.insert(key);
+ }
+
+ for (; lowerBound != upperBound; ++lowerBound) {
+ remainKeys.erase(lowerBound->first);
+ }
+
+ UNIT_ASSERT_VALUES_EQUAL(remainKeys, expectedKeys);
+}
+
+Y_UNIT_TEST(DeleteKeys) {
+ const TMap<int, int> map{
+ {11, 1},
+ {13, 1},
+ {15, 1},
+ {17, 1},
+ {19, 1}
+ };
+
+ CheckDeleteKeys(map, OpenOpen ( 2, 10), {11, 13, 15, 17, 19});
+ CheckDeleteKeys(map, OpenClose ( 2, 10), {11, 13, 15, 17, 19});
+ CheckDeleteKeys(map, CloseOpen ( 2, 10), {11, 13, 15, 17, 19});
+ CheckDeleteKeys(map, CloseClose( 2, 10), {11, 13, 15, 17, 19});
+
+ CheckDeleteKeys(map, OpenOpen ( 2, 11), {11, 13, 15, 17, 19});
+ CheckDeleteKeys(map, OpenClose ( 2, 11), { 13, 15, 17, 19});
+ CheckDeleteKeys(map, CloseOpen ( 2, 11), {11, 13, 15, 17, 19});
+ CheckDeleteKeys(map, CloseClose( 2, 11), { 13, 15, 17, 19});
+
+ CheckDeleteKeys(map, OpenOpen ( 2, 14), { 15, 17, 19});
+ CheckDeleteKeys(map, OpenClose ( 2, 14), { 15, 17, 19});
+ CheckDeleteKeys(map, CloseOpen ( 2, 14), { 15, 17, 19});
+ CheckDeleteKeys(map, CloseClose( 2, 14), { 15, 17, 19});
+
+ CheckDeleteKeys(map, OpenOpen ( 2, 15), { 15, 17, 19});
+ CheckDeleteKeys(map, OpenClose ( 2, 15), { 17, 19});
+ CheckDeleteKeys(map, CloseOpen ( 2, 15), { 15, 17, 19});
+ CheckDeleteKeys(map, CloseClose( 2, 15), { 17, 19});
+
+ CheckDeleteKeys(map, OpenOpen ( 2, 19), { 19});
+ CheckDeleteKeys(map, OpenClose ( 2, 19), { });
+ CheckDeleteKeys(map, CloseOpen ( 2, 19), { 19});
+ CheckDeleteKeys(map, CloseClose( 2, 19), { });
+
+ CheckDeleteKeys(map, OpenOpen ( 2, 21), { });
+ CheckDeleteKeys(map, OpenClose ( 2, 21), { });
+ CheckDeleteKeys(map, CloseOpen ( 2, 21), { });
+ CheckDeleteKeys(map, CloseClose( 2, 21), { });
+
+ CheckDeleteKeys(map, OpenOpen (11, 11), {11, 13, 15, 17, 19});
+ CheckDeleteKeys(map, OpenClose (11, 11), {11, 13, 15, 17, 19});
+ CheckDeleteKeys(map, CloseOpen (11, 11), {11, 13, 15, 17, 19});
+ CheckDeleteKeys(map, CloseClose(11, 11), { 13, 15, 17, 19});
+
+ CheckDeleteKeys(map, OpenOpen (11, 14), {11, 15, 17, 19});
+ CheckDeleteKeys(map, OpenClose (11, 14), {11, 15, 17, 19});
+ CheckDeleteKeys(map, CloseOpen (11, 14), { 15, 17, 19});
+ CheckDeleteKeys(map, CloseClose(11, 14), { 15, 17, 19});
+
+ CheckDeleteKeys(map, OpenOpen (11, 15), {11, 15, 17, 19});
+ CheckDeleteKeys(map, OpenClose (11, 15), {11, 17, 19});
+ CheckDeleteKeys(map, CloseOpen (11, 15), { 15, 17, 19});
+ CheckDeleteKeys(map, CloseClose(11, 15), { 17, 19});
+
+ CheckDeleteKeys(map, OpenOpen (11, 19), {11, 19});
+ CheckDeleteKeys(map, OpenClose (11, 19), {11 });
+ CheckDeleteKeys(map, CloseOpen (11, 19), { 19});
+ CheckDeleteKeys(map, CloseClose(11, 19), { });
+
+ CheckDeleteKeys(map, OpenOpen (11, 21), {11 });
+ CheckDeleteKeys(map, OpenClose (11, 21), {11 });
+ CheckDeleteKeys(map, CloseOpen (11, 21), { });
+ CheckDeleteKeys(map, CloseClose(11, 21), { });
+
+ CheckDeleteKeys(map, OpenOpen (12, 14), {11, 15, 17, 19});
+ CheckDeleteKeys(map, OpenClose (12, 14), {11, 15, 17, 19});
+ CheckDeleteKeys(map, CloseOpen (12, 14), {11, 15, 17, 19});
+ CheckDeleteKeys(map, CloseClose(12, 14), {11, 15, 17, 19});
+
+ CheckDeleteKeys(map, OpenOpen (14, 14), {11, 13, 15, 17, 19});
+ CheckDeleteKeys(map, OpenClose (14, 14), {11, 13, 15, 17, 19});
+ CheckDeleteKeys(map, CloseOpen (14, 14), {11, 13, 15, 17, 19});
+ CheckDeleteKeys(map, CloseClose(14, 14), {11, 13, 15, 17, 19});
+
+ CheckDeleteKeys(map, OpenOpen (14, 15), {11, 13, 15, 17, 19});
+ CheckDeleteKeys(map, OpenClose (14, 15), {11, 13, 17, 19});
+ CheckDeleteKeys(map, CloseOpen (14, 15), {11, 13, 15, 17, 19});
+ CheckDeleteKeys(map, CloseClose(14, 15), {11, 13, 17, 19});
+
+ CheckDeleteKeys(map, OpenOpen (14, 19), {11, 13, 19});
+ CheckDeleteKeys(map, OpenClose (14, 19), {11, 13 });
+ CheckDeleteKeys(map, CloseOpen (14, 19), {11, 13, 19});
+ CheckDeleteKeys(map, CloseClose(14, 19), {11, 13 });
+
+ CheckDeleteKeys(map, OpenOpen (14, 21), {11, 13 });
+ CheckDeleteKeys(map, OpenClose (14, 21), {11, 13 });
+ CheckDeleteKeys(map, CloseOpen (14, 21), {11, 13 });
+ CheckDeleteKeys(map, CloseClose(14, 21), {11, 13 });
+
+ CheckDeleteKeys(map, OpenOpen (15, 15), {11, 13, 15, 17, 19});
+ CheckDeleteKeys(map, OpenClose (15, 15), {11, 13, 15, 17, 19});
+ CheckDeleteKeys(map, CloseOpen (15, 15), {11, 13, 15, 17, 19});
+ CheckDeleteKeys(map, CloseClose(15, 15), {11, 13, 17, 19});
+
+ CheckDeleteKeys(map, OpenOpen (15, 19), {11, 13, 15, 19});
+ CheckDeleteKeys(map, OpenClose (15, 19), {11, 13, 15, });
+ CheckDeleteKeys(map, CloseOpen (15, 19), {11, 13, 19});
+ CheckDeleteKeys(map, CloseClose(15, 19), {11, 13 });
+
+ CheckDeleteKeys(map, OpenOpen (15, 21), {11, 13, 15 });
+ CheckDeleteKeys(map, OpenClose (15, 21), {11, 13, 15 });
+ CheckDeleteKeys(map, CloseOpen (15, 21), {11, 13 });
+ CheckDeleteKeys(map, CloseClose(15, 21), {11, 13 });
+
+ CheckDeleteKeys(map, OpenOpen (19, 19), {11, 13, 15, 17, 19});
+ CheckDeleteKeys(map, OpenClose (19, 19), {11, 13, 15, 17, 19});
+ CheckDeleteKeys(map, CloseOpen (19, 19), {11, 13, 15, 17, 19});
+ CheckDeleteKeys(map, CloseClose(19, 19), {11, 13, 15, 17 });
+
+ CheckDeleteKeys(map, OpenOpen (19, 21), {11, 13, 15, 17, 19});
+ CheckDeleteKeys(map, OpenClose (19, 21), {11, 13, 15, 17, 19});
+ CheckDeleteKeys(map, CloseOpen (19, 21), {11, 13, 15, 17 });
+ CheckDeleteKeys(map, CloseClose(19, 21), {11, 13, 15, 17 });
+
+ CheckDeleteKeys(map, OpenOpen (21, 21), {11, 13, 15, 17, 19});
+ CheckDeleteKeys(map, OpenClose (21, 21), {11, 13, 15, 17, 19});
+ CheckDeleteKeys(map, CloseOpen (21, 21), {11, 13, 15, 17, 19});
+ CheckDeleteKeys(map, CloseClose(21, 21), {11, 13, 15, 17, 19});
+}
+
+}
diff --git a/ydb/core/persqueue/ut/partition_ut.cpp b/ydb/core/persqueue/ut/partition_ut.cpp
index d01b686442..51aefbc2fb 100644
--- a/ydb/core/persqueue/ut/partition_ut.cpp
+++ b/ydb/core/persqueue/ut/partition_ut.cpp
@@ -1149,10 +1149,10 @@ void TPartitionFixture::ShadowPartitionCountersTest(bool isFirstClass) {
auto& counterData = meta.GetCounterData();
UNIT_ASSERT_VALUES_EQUAL(counterData.GetMessagesWrittenTotal(), cookie - 1);
UNIT_ASSERT_VALUES_EQUAL(counterData.GetMessagesWrittenGrpc(),isFirstClass ? cookie - 1 : 0);
- UNIT_ASSERT(counterData.GetBytesWrittenUncompressed() > currUncSize);
+ UNIT_ASSERT(counterData.GetBytesWrittenUncompressed() >= currUncSize);
currUncSize = counterData.GetBytesWrittenUncompressed();
UNIT_ASSERT_VALUES_EQUAL(counterData.GetBytesWrittenGrpc(), isFirstClass ? counterData.GetBytesWrittenTotal() : 0);
- UNIT_ASSERT(counterData.GetBytesWrittenTotal() > currTotalSize);
+ UNIT_ASSERT(counterData.GetBytesWrittenTotal() >= currTotalSize);
currTotalSize = counterData.GetBytesWrittenTotal();
if (cookie == 11) {
diff --git a/ydb/core/persqueue/ut/ya.make b/ydb/core/persqueue/ut/ya.make
index a2c6314bfe..54749543f8 100644
--- a/ydb/core/persqueue/ut/ya.make
+++ b/ydb/core/persqueue/ut/ya.make
@@ -49,6 +49,7 @@ SRCS(
fetch_request_ut.cpp
utils_ut.cpp
list_all_topics_ut.cpp
+ cache_eviction_ut.cpp
)
RESOURCE(
diff --git a/ydb/core/testlib/test_pq_client.h b/ydb/core/testlib/test_pq_client.h
index 48608698f4..a04a1f9aae 100644
--- a/ydb/core/testlib/test_pq_client.h
+++ b/ydb/core/testlib/test_pq_client.h
@@ -5,6 +5,7 @@
#include <ydb/core/persqueue/cluster_tracker.h>
#include <ydb/core/protos/flat_tx_scheme.pb.h>
#include <ydb/core/mind/address_classification/net_classifier.h>
+#include <ydb/core/keyvalue/keyvalue_events.h>
#include <ydb/public/api/protos/draft/persqueue_error_codes.pb.h>
#include <ydb/public/lib/deprecated/kicli/kicli.h>
#include <ydb-cpp-sdk/client/driver/driver.h>
@@ -860,6 +861,44 @@ public:
}
}
+ TVector<TString> GetPQTabletKeys(TTestActorRuntime* runtime, const TString& topic, ui32 partitionId) {
+ ui64 tabletId = Max<ui64>();
+
+ auto res = Ls("/Root/PQ/" + topic);
+ const auto& pq = res->Record.GetPathDescription().GetPersQueueGroup();
+ for (ui32 i = 0; i < pq.PartitionsSize(); ++i) {
+ const auto& partition = pq.GetPartitions(i);
+ if (partition.GetPartitionId() == partitionId) {
+ tabletId = partition.GetTabletId();
+ break;
+ }
+ }
+
+ Y_ABORT_UNLESS(tabletId != Max<ui64>());
+
+ auto request = MakeHolder<TEvKeyValue::TEvRequest>();
+ auto* cmd = request->Record.AddCmdReadRange();
+ auto* range = cmd->MutableRange();
+ range->SetFrom("\x00");
+ range->SetTo("\xFF");
+
+ TActorId sender = runtime->AllocateEdgeActor();
+ ForwardToTablet(*runtime, tabletId, sender, request.Release(), 0);
+ auto response = runtime->GrabEdgeEvent<TEvKeyValue::TEvResponse>();
+
+ TVector<TString> keys;
+
+ for (size_t i = 0; i < response->Record.ReadRangeResultSize(); ++i) {
+ const auto& result = response->Record.GetReadRangeResult(i);
+ for (size_t j = 0; j < result.PairSize(); ++j) {
+ const auto& pair = result.GetPair(j);
+ keys.push_back(pair.GetKey());
+ }
+ }
+
+ return keys;
+ }
+
bool IsTopicDeleted(const TString& name) {
auto response = RequestTopicMetadata(name);
diff --git a/ydb/public/sdk/cpp/src/client/topic/ut/topic_to_table_ut.cpp b/ydb/public/sdk/cpp/src/client/topic/ut/topic_to_table_ut.cpp
index 776e353cc1..a057b113f8 100644
--- a/ydb/public/sdk/cpp/src/client/topic/ut/topic_to_table_ut.cpp
+++ b/ydb/public/sdk/cpp/src/client/topic/ut/topic_to_table_ut.cpp
@@ -9,6 +9,7 @@
#include <ydb/core/persqueue/key.h>
#include <ydb/core/persqueue/blob.h>
#include <ydb/core/persqueue/events/global.h>
+#include <ydb/core/persqueue/pq_l2_service.h>
#include <ydb/core/tx/long_tx_service/public/events.h>
#include <ydb/core/persqueue/ut/common/autoscaling_ut_common.h>
@@ -237,6 +238,8 @@ protected:
virtual bool GetEnableHtapTx() const;
virtual bool GetAllowOlapDataQuery() const;
+ size_t GetPQCacheRenameKeysCount();
+
private:
template<class E>
E ReadEvent(TTopicReadSessionPtr reader, NTable::TTransaction& tx);
@@ -248,6 +251,8 @@ private:
ui32 partition);
std::vector<std::string> GetTabletKeys(const TActorId& actorId,
ui64 tabletId);
+ std::vector<std::string> GetPQTabletDataKeys(const TActorId& actorId,
+ ui64 tabletId);
NPQ::TWriteId GetTransactionWriteId(const TActorId& actorId,
ui64 tabletId);
void SendLongTxLockStatus(const TActorId& actorId,
@@ -1084,6 +1089,43 @@ std::vector<std::string> TFixture::GetTabletKeys(const TActorId& actorId,
return keys;
}
+std::vector<std::string> TFixture::GetPQTabletDataKeys(const TActorId& actorId,
+ ui64 tabletId)
+{
+ using namespace NKikimr::NPQ;
+
+ std::vector<std::string> keys;
+
+ for (const auto& key : GetTabletKeys(actorId, tabletId)) {
+ if (key.empty() ||
+ ((std::tolower(key.front()) != TKeyPrefix::TypeData) &&
+ (std::tolower(key.front()) != TKeyPrefix::TypeTmpData))) {
+ continue;
+ }
+
+ keys.push_back(key);
+ }
+
+ return keys;
+}
+
+size_t TFixture::GetPQCacheRenameKeysCount()
+{
+ using namespace NKikimr::NPQ;
+
+ auto& runtime = Setup->GetRuntime();
+ TActorId edge = runtime.AllocateEdgeActor();
+
+ auto request = MakeHolder<TEvPqCache::TEvCacheKeysRequest>();
+
+ runtime.Send(MakePersQueueL2CacheID(), edge, request.Release());
+
+ TAutoPtr<IEventHandle> handle;
+ auto* result = runtime.GrabEdgeEvent<TEvPqCache::TEvCacheKeysResponse>(handle);
+
+ return result->RenamedKeys;
+}
+
void TFixture::RestartLongTxService()
{
auto& runtime = Setup->GetRuntime();
@@ -2579,6 +2621,55 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_48, TFixture)
UNIT_ASSERT_GT(topicDescription.GetTotalPartitionsCount(), 2);
}
+Y_UNIT_TEST_F(WriteToTopic_Demo_50, TFixture)
+{
+ // We write to the topic in the transaction. When a transaction is committed, the keys in the blob
+ // cache are renamed.
+ CreateTopic("topic_A", TEST_CONSUMER);
+ CreateTopic("topic_B", TEST_CONSUMER);
+
+ TString message(128_KB, 'x');
+
+ WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID_1, message);
+ WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID_1);
+
+ auto session = CreateTableSession();
+
+ // tx #1
+ // After the transaction commit, there will be no large blobs in the batches. The number of renames
+ // will not change in the cache.
+ auto tx = BeginTx(session);
+
+ WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID_2, message, &tx);
+ WriteToTopic("topic_B", TEST_MESSAGE_GROUP_ID_3, message, &tx);
+
+ UNIT_ASSERT_VALUES_EQUAL(GetPQCacheRenameKeysCount(), 0);
+
+ CommitTx(tx, EStatus::SUCCESS);
+
+ Sleep(TDuration::Seconds(5));
+
+ UNIT_ASSERT_VALUES_EQUAL(GetPQCacheRenameKeysCount(), 0);
+
+ // tx #2
+ // After the commit, the party will rename one big blob
+ tx = BeginTx(session);
+
+ for (unsigned i = 0; i < 80; ++i) {
+ WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID_2, message, &tx);
+ }
+
+ WriteToTopic("topic_B", TEST_MESSAGE_GROUP_ID_3, message, &tx);
+
+ UNIT_ASSERT_VALUES_EQUAL(GetPQCacheRenameKeysCount(), 0);
+
+ CommitTx(tx, EStatus::SUCCESS);
+
+ Sleep(TDuration::Seconds(5));
+
+ UNIT_ASSERT_VALUES_EQUAL(GetPQCacheRenameKeysCount(), 1);
+}
+
class TFixtureSinks : public TFixture {
protected:
void CreateRowTable(const TString& path);
diff --git a/ydb/services/persqueue_v1/persqueue_ut.cpp b/ydb/services/persqueue_v1/persqueue_ut.cpp
index 1b2fefc976..81ab1f45f9 100644
--- a/ydb/services/persqueue_v1/persqueue_ut.cpp
+++ b/ydb/services/persqueue_v1/persqueue_ut.cpp
@@ -2839,10 +2839,10 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) {
ui32 fromDisk = info0.BlobsFromDisk + info16.BlobsFromDisk;
ui32 fromCache = info0.BlobsFromCache + info16.BlobsFromCache;
- UNIT_ASSERT(fromDisk > 0);
- UNIT_ASSERT(fromDisk < 5);
- UNIT_ASSERT(fromCache > 0);
- UNIT_ASSERT(fromCache < 5);
+ UNIT_ASSERT_GE(fromDisk, 0);
+ UNIT_ASSERT_LE(fromDisk, 6);
+ UNIT_ASSERT_GT(fromCache, 0);
+ UNIT_ASSERT_LE(fromCache, 6);
}
Y_UNIT_TEST(CacheHead) {