diff options
author | Alek5andr-Kotov <akotov@ydb.tech> | 2025-03-27 17:57:36 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2025-03-27 14:57:36 +0000 |
commit | 47cc8b277d0188a6edc257e188843affd231d156 (patch) | |
tree | f29f439dc897377ea086794eff050baa96bb72fa | |
parent | 6b8ffd922403171f9968f00cc9d44857a33af2b7 (diff) | |
download | ydb-47cc8b277d0188a6edc257e188843affd231d156.tar.gz |
Keys for blocks in the PQ tablet (#16337)
-rw-r--r-- | ydb/core/persqueue/blob.cpp | 12 | ||||
-rw-r--r-- | ydb/core/persqueue/cache_eviction.h | 6 | ||||
-rw-r--r-- | ydb/core/persqueue/key.cpp | 42 | ||||
-rw-r--r-- | ydb/core/persqueue/key.h | 151 | ||||
-rw-r--r-- | ydb/core/persqueue/partition.cpp | 2 | ||||
-rw-r--r-- | ydb/core/persqueue/partition_init.cpp | 12 | ||||
-rw-r--r-- | ydb/core/persqueue/partition_read.cpp | 2 | ||||
-rw-r--r-- | ydb/core/persqueue/partition_write.cpp | 39 | ||||
-rw-r--r-- | ydb/core/persqueue/read.h | 2 | ||||
-rw-r--r-- | ydb/core/persqueue/ut/internals_ut.cpp | 53 | ||||
-rw-r--r-- | ydb/core/persqueue/ut/partition_ut.cpp | 11 | ||||
-rw-r--r-- | ydb/core/persqueue/ut/pq_ut.cpp | 12 | ||||
-rw-r--r-- | ydb/core/persqueue/ut/user_action_processor_ut.cpp | 2 |
13 files changed, 232 insertions, 114 deletions
diff --git a/ydb/core/persqueue/blob.cpp b/ydb/core/persqueue/blob.cpp index 084fbd9c6b9..54f7dd53bf4 100644 --- a/ydb/core/persqueue/blob.cpp +++ b/ydb/core/persqueue/blob.cpp @@ -883,8 +883,8 @@ auto TPartitionedBlob::CreateFormedBlob(ui32 size, bool useRename) -> std::optio Y_ABORT_UNLESS(NewHead.GetNextOffset() >= (GlueHead ? Head.Offset : NewHead.Offset)); - TKey tmpKey(TKeyPrefix::TypeTmpData, Partition, StartOffset, StartPartNo, count, InternalPartsCount, false); - TKey dataKey(TKeyPrefix::TypeData, Partition, StartOffset, StartPartNo, count, InternalPartsCount, false); + auto tmpKey = TKey::ForBody(TKeyPrefix::TypeTmpData, Partition, StartOffset, StartPartNo, count, InternalPartsCount); + auto dataKey = TKey::ForBody(TKeyPrefix::TypeData, Partition, StartOffset, StartPartNo, count, InternalPartsCount); StartOffset = Offset; StartPartNo = NextPartNo; @@ -955,13 +955,7 @@ auto TPartitionedBlob::Add(const TKey& oldKey, ui32 size) -> std::optional<TForm NewHead.Offset = StartOffset; } - TKey newKey(TKeyPrefix::TypeData, - Partition, - StartOffset, - oldKey.GetPartNo(), - oldKey.GetCount(), - oldKey.GetInternalPartsCount(), - oldKey.IsHead()); + auto newKey = TKey::FromKey(oldKey, TKeyPrefix::TypeData, Partition, StartOffset); FormedBlobs.emplace_back(oldKey, newKey, size); diff --git a/ydb/core/persqueue/cache_eviction.h b/ydb/core/persqueue/cache_eviction.h index 767ffa0bf6d..09aeda4e1b5 100644 --- a/ydb/core/persqueue/cache_eviction.h +++ b/ydb/core/persqueue/cache_eviction.h @@ -106,7 +106,7 @@ namespace NKikimr::NPQ { for (auto& blob : Blobs) { if (blob.Value.empty()) { // add reading command - TKey key(TKeyPrefix::TypeData, Partition, blob.Offset, blob.PartNo, blob.Count, blob.InternalPartsCount); + auto key = TKey::ForBody(TKeyPrefix::TypeData, Partition, blob.Offset, blob.PartNo, blob.Count, blob.InternalPartsCount); auto read = request->Record.AddCmdRead(); read->SetKey(key.Data(), key.Size()); } @@ -142,7 +142,7 @@ namespace NKikimr::NPQ { } void Verify(const TRequestedBlob& blob) const { - TKey key(TKeyPrefix::TypeData, TPartitionId(0), blob.Offset, blob.PartNo, blob.Count, blob.InternalPartsCount, false); + auto key = TKey::ForBody(TKeyPrefix::TypeData, TPartitionId(0), blob.Offset, blob.PartNo, blob.Count, blob.InternalPartsCount); Y_ABORT_UNLESS(blob.Value.size() == blob.Size); TClientBlob::CheckBlob(key, blob.Value); } @@ -324,7 +324,7 @@ namespace NKikimr::NPQ { partitionId.InternalPartitionId = partitionId.OriginalPartitionId; return {partitionId, 0, 0, 0, 0}; } else { - TKey key(s); + auto key = TKey::FromString(s); return {key.GetPartition(), key.GetOffset(), key.GetPartNo(), key.GetCount(), key.GetInternalPartsCount()}; } } diff --git a/ydb/core/persqueue/key.cpp b/ydb/core/persqueue/key.cpp index 88319f2f391..e3794e8c62a 100644 --- a/ydb/core/persqueue/key.cpp +++ b/ydb/core/persqueue/key.cpp @@ -10,7 +10,7 @@ std::pair<TKeyPrefix, TKeyPrefix> MakeKeyPrefixRange(TKeyPrefix::EType type, con return {std::move(from), std::move(to)}; } -TKey MakeKeyFromString(const TString& s, const TPartitionId& partition) +TKey TKey::FromString(const TString& s, const TPartitionId& partition) { TKey t(s); return TKey(t.GetType(), @@ -19,7 +19,45 @@ TKey MakeKeyFromString(const TString& s, const TPartitionId& partition) t.GetPartNo(), t.GetCount(), t.GetInternalPartsCount(), - t.IsHead()); + t.GetSuffix()); +} + +TKey TKey::ForBody(EType type, + const TPartitionId& partition, + const ui64 offset, + const ui16 partNo, + const ui32 count, + const ui16 internalPartsCount) +{ + return {type, partition, offset, partNo, count, internalPartsCount, Nothing()}; +} + +TKey TKey::ForHead(EType type, + const TPartitionId& partition, + const ui64 offset, + const ui16 partNo, + const ui32 count, + const ui16 internalPartsCount) +{ + return {type, partition, offset, partNo, count, internalPartsCount, '|'}; +} + +TKey TKey::ForFastWrite(EType type, + const TPartitionId& partition, + const ui64 offset, + const ui16 partNo, + const ui32 count, + const ui16 internalPartsCount) +{ + return {type, partition, offset, partNo, count, internalPartsCount, '?'}; +} + +TKey TKey::FromKey(const TKey& k, + EType type, + const TPartitionId& partition, + ui64 offset) +{ + return {type, partition, offset, k.GetPartNo(), k.GetCount(), k.GetInternalPartsCount(), k.GetSuffix()}; } void TKeyPrefix::SetTypeImpl(EType type, bool isServicePartition) diff --git a/ydb/core/persqueue/key.h b/ydb/core/persqueue/key.h index f9cddc6a3ca..c303fd948f6 100644 --- a/ydb/core/persqueue/key.h +++ b/ydb/core/persqueue/key.h @@ -11,7 +11,7 @@ namespace NKikimr { namespace NPQ { -// {char type; ui32 partiton; (char mark)} +// {char type; ui32 partition; (char mark)} class TKeyPrefix : public TBuffer { public: @@ -54,7 +54,7 @@ public: {} TString ToString() const { - return TString(Data(), Size()); + return {Data(), Size()}; } bool Marked(EMark mark) { @@ -134,7 +134,7 @@ private: std::pair<TKeyPrefix, TKeyPrefix> MakeKeyPrefixRange(TKeyPrefix::EType type, const TPartitionId& partition); -// {char type; ui32 partiton; ui64 offset; ui16 partNo; ui32 count, ui16 internalPartsCount} +// {char type; ui32 partition; ui64 offset; ui16 partNo; ui32 count, ui16 internalPartsCount} // offset, partNo - index of first rec // count - diff of last record offset and first record offset in blob // internalPartsCount - number of internal parts @@ -144,105 +144,90 @@ std::pair<TKeyPrefix, TKeyPrefix> MakeKeyPrefixRange(TKeyPrefix::EType type, con class TKey : public TKeyPrefix { public: - TKey(EType type, const TPartitionId& partition, const ui64 offset, const ui16 partNo, const ui32 count, const ui16 internalPartsCount, const bool isHead = false) - : TKeyPrefix(type, partition) - , Offset(offset) - , Count(count) - , PartNo(partNo) - , InternalPartsCount(internalPartsCount) - { - Resize(KeySize()); - *(PtrOffset() - 1) = *(PtrCount() - 1) = *(PtrPartNo() - 1) = *(PtrInternalPartsCount() - 1) = '_'; - SetOffset(offset); - SetPartNo(partNo); - SetCount(count); - SetInternalPartsCount(InternalPartsCount); - SetHead(isHead); - } + static TKey ForBody(EType type, + const TPartitionId& partition, + const ui64 offset, + const ui16 partNo, + const ui32 count, + const ui16 internalPartsCount); + static TKey ForHead(EType type, + const TPartitionId& partition, + const ui64 offset, + const ui16 partNo, + const ui32 count, + const ui16 internalPartsCount); + static TKey ForFastWrite(EType type, + const TPartitionId& partition, + const ui64 offset, + const ui16 partNo, + const ui32 count, + const ui16 internalPartsCount); + + static TKey FromString(const TString& s) { return {s}; } + static TKey FromString(const TString& s, const TPartitionId& partition); + + static TKey FromKey(const TKey& k, + EType type, + const TPartitionId& partitionId, + ui64 offset); + + TKey() + : TKey(TypeNone, TPartitionId(0), 0, 0, 0, 0, false) + {} TKey(const TKey& key) - : TKey(key.GetType(), key.GetPartition(), key.Offset, key.PartNo, key.Count, key.InternalPartsCount, key.IsHead()) + : TKey(key.GetType(), key.GetPartition(), key.Offset, key.PartNo, key.Count, key.InternalPartsCount, key.GetSuffix()) { } - TKey(const TString& data) - { - Assign(data.data(), data.size()); - Y_ABORT_UNLESS(data.size() == KeySize() + IsHead()); - Y_ABORT_UNLESS(*(PtrOffset() - 1) == '_'); - Y_ABORT_UNLESS(*(PtrCount() - 1) == '_'); - Y_ABORT_UNLESS(*(PtrPartNo() - 1) == '_'); - Y_ABORT_UNLESS(*(PtrInternalPartsCount() - 1) == '_'); - - ParsePartition(); - ParseOffset(); - ParseCount(); - ParsePartNo(); - ParseInternalPartsCount(); - } - - TKey() - : TKey(TypeNone, TPartitionId(0), 0, 0, 0, 0) - {} - virtual ~TKey() {} - TString ToString() const { - return TString(Data(), Size()); - } - - void SetHead(const bool isHead) { - Resize(KeySize() + isHead); - if (isHead) - Data()[KeySize()] = '|'; - } - void SetOffset(const ui64 offset) { - Y_ABORT_UNLESS(Size() == KeySize() + IsHead()); + Y_ABORT_UNLESS(Size() == KeySize() + HasSuffix()); Offset = offset; memcpy(PtrOffset(), Sprintf("%.20" PRIu64, offset).data(), 20); } ui64 GetOffset() const { - Y_ABORT_UNLESS(Size() == KeySize() + IsHead()); + Y_ABORT_UNLESS(Size() == KeySize() + HasSuffix()); return Offset; } void SetCount(const ui32 count) { - Y_ABORT_UNLESS(Size() == KeySize() + IsHead()); + Y_ABORT_UNLESS(Size() == KeySize() + HasSuffix()); Count = count; memcpy(PtrCount(), Sprintf("%.10" PRIu32, count).data(), 10); } ui32 GetCount() const { - Y_ABORT_UNLESS(Size() == KeySize() + IsHead()); + Y_ABORT_UNLESS(Size() == KeySize() + HasSuffix()); return Count; } void SetPartNo(const ui16 partNo) { - Y_ABORT_UNLESS(Size() == KeySize() + IsHead()); + Y_ABORT_UNLESS(Size() == KeySize() + HasSuffix()); PartNo = partNo; memcpy(PtrPartNo(), Sprintf("%.5" PRIu16, partNo).data(), 5); } ui16 GetPartNo() const { - Y_ABORT_UNLESS(Size() == KeySize() + IsHead()); + Y_ABORT_UNLESS(Size() == KeySize() + HasSuffix()); return PartNo; } void SetInternalPartsCount(const ui16 internalPartsCount) { - Y_ABORT_UNLESS(Size() == KeySize() + IsHead()); + Y_ABORT_UNLESS(Size() == KeySize() + HasSuffix()); InternalPartsCount = internalPartsCount; memcpy(PtrInternalPartsCount(), Sprintf("%.5" PRIu16, internalPartsCount).data(), 5); } ui16 GetInternalPartsCount() const { - Y_ABORT_UNLESS(Size() == KeySize() + IsHead()); + Y_ABORT_UNLESS(Size() == KeySize() + HasSuffix()); return InternalPartsCount; } - bool IsHead() const { + bool HasSuffix() const { return Size() == KeySize() + 1; } @@ -257,6 +242,38 @@ public: } private: + TKey(EType type, const TPartitionId& partition, const ui64 offset, const ui16 partNo, const ui32 count, const ui16 internalPartsCount, const TMaybe<char> suffix) + : TKeyPrefix(type, partition) + , Offset(offset) + , Count(count) + , PartNo(partNo) + , InternalPartsCount(internalPartsCount) + { + Resize(KeySize()); + *(PtrOffset() - 1) = *(PtrCount() - 1) = *(PtrPartNo() - 1) = *(PtrInternalPartsCount() - 1) = '_'; + SetOffset(offset); + SetPartNo(partNo); + SetCount(count); + SetInternalPartsCount(InternalPartsCount); + SetSuffix(suffix); + } + + TKey(const TString& data) + { + Assign(data.data(), data.size()); + Y_ABORT_UNLESS(data.size() == KeySize() + HasSuffix()); + Y_ABORT_UNLESS(*(PtrOffset() - 1) == '_'); + Y_ABORT_UNLESS(*(PtrCount() - 1) == '_'); + Y_ABORT_UNLESS(*(PtrPartNo() - 1) == '_'); + Y_ABORT_UNLESS(*(PtrInternalPartsCount() - 1) == '_'); + + ParsePartition(); + ParseOffset(); + ParseCount(); + ParsePartNo(); + ParseInternalPartsCount(); + } + char* PtrOffset() { return Data() + UnmarkedSize() + 1; } char* PtrPartNo() { return PtrOffset() + 20 + 1; } char* PtrCount() { return PtrPartNo() + 5 + 1; } @@ -287,14 +304,28 @@ private: InternalPartsCount = FromString<ui16>(TStringBuf{PtrInternalPartsCount(), 5}); } + TMaybe<char> GetSuffix() const + { + if (HasSuffix()) { + return Data()[KeySize()]; + } + return Nothing(); + } + + void SetSuffix(TMaybe<char> suffix) + { + Resize(KeySize() + suffix.Defined()); + if (suffix.Defined()) { + Data()[KeySize()] = *suffix; + } + } + ui64 Offset; ui32 Count; ui16 PartNo; ui16 InternalPartsCount; }; -TKey MakeKeyFromString(const TString& s, const TPartitionId& partition); - inline TString GetTxKey(ui64 txId) { diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp index 9a37c987e1c..73bbd97f007 100644 --- a/ydb/core/persqueue/partition.cpp +++ b/ydb/core/persqueue/partition.cpp @@ -919,7 +919,7 @@ TInstant TPartition::GetWriteTimeEstimate(ui64 offset) const { Y_ABORT_UNLESS(it != container.begin(), "Tablet %lu StartOffset %lu, HeadOffset %lu, offset %lu, containter size %lu, first-elem: %s", TabletID, StartOffset, Head.Offset, offset, container.size(), - container.front().Key.ToString().c_str()); + container.front().Key.ToString().data()); Y_ABORT_UNLESS(it == container.end() || it->Key.GetOffset() > offset || it->Key.GetOffset() == offset && it->Key.GetPartNo() > 0); diff --git a/ydb/core/persqueue/partition_init.cpp b/ydb/core/persqueue/partition_init.cpp index 1e134088279..863b85facba 100644 --- a/ydb/core/persqueue/partition_init.cpp +++ b/ydb/core/persqueue/partition_init.cpp @@ -516,7 +516,7 @@ THashSet<TString> FilterBlobsMetaData(const NKikimrClient::TKeyValueResponse::TR for (ui32 i = 0; i < range.PairSize(); ++i) { const auto& pair = range.GetPair(i); Y_ABORT_UNLESS(pair.GetStatus() == NKikimrProto::OK); //this is readrange without keys, only OK could be here - source.push_back(MakeKeyFromString(pair.GetKey(), partitionId)); + source.push_back(TKey::FromString(pair.GetKey(), partitionId)); } auto isKeyLess = [](const TKey& lhs, const TKey& rhs) { @@ -590,7 +590,7 @@ void TInitDataRangeStep::FillBlobsMetaData(const NKikimrClient::TKeyValueRespons for (ui32 i = 0; i < range.PairSize(); ++i) { const auto& pair = range.GetPair(i); Y_ABORT_UNLESS(pair.GetStatus() == NKikimrProto::OK); //this is readrange without keys, only OK could be here - TKey k = MakeKeyFromString(pair.GetKey(), PartitionId()); + auto k = TKey::FromString(pair.GetKey(), PartitionId()); if (!actualKeys.contains(pair.GetKey())) { Partition()->DeletedKeys.emplace_back(k.ToString()); continue; @@ -612,7 +612,7 @@ void TInitDataRangeStep::FillBlobsMetaData(const NKikimrClient::TKeyValueRespons Y_ABORT_UNLESS(k.GetOffset() >= endOffset); endOffset = k.GetOffset() + k.GetCount(); //at this point EndOffset > StartOffset - if (!k.IsHead()) //head.Size will be filled after read or head blobs + if (!k.HasSuffix()) //head.Size will be filled after read or head blobs bodySize += pair.GetValueSize(); PQ_LOG_D("Got data offset " << k.GetOffset() << " count " << k.GetCount() << " size " << pair.GetValueSize() @@ -639,7 +639,7 @@ void TInitDataRangeStep::FormHeadAndProceed() { head.Offset = endOffset; head.PartNo = 0; - while (dataKeysBody.size() > 0 && dataKeysBody.back().Key.IsHead()) { + while (dataKeysBody.size() > 0 && dataKeysBody.back().Key.HasSuffix()) { Y_ABORT_UNLESS(dataKeysBody.back().Key.GetOffset() + dataKeysBody.back().Key.GetCount() == head.Offset); //no gaps in head allowed headKeys.push_front(dataKeysBody.back()); head.Offset = dataKeysBody.back().Key.GetOffset(); @@ -647,7 +647,7 @@ void TInitDataRangeStep::FormHeadAndProceed() { dataKeysBody.pop_back(); } for (const auto& p : dataKeysBody) { - Y_ABORT_UNLESS(!p.Key.IsHead()); + Y_ABORT_UNLESS(!p.Key.HasSuffix()); } Y_ABORT_UNLESS(headKeys.empty() || head.Offset == headKeys.front().Key.GetOffset() && head.PartNo == headKeys.front().Key.GetPartNo()); @@ -707,7 +707,7 @@ void TInitDataStep::Handle(TEvKeyValue::TEvResponse::TPtr &ev, const TActorConte switch(read.GetStatus()) { case NKikimrProto::OK: { const TKey& key = headKeys[i].Key; - Y_ABORT_UNLESS(key.IsHead()); + Y_ABORT_UNLESS(key.HasSuffix()); ui32 size = headKeys[i].Size; ui64 offset = key.GetOffset(); diff --git a/ydb/core/persqueue/partition_read.cpp b/ydb/core/persqueue/partition_read.cpp index 4161ea3fd99..faf618176a8 100644 --- a/ydb/core/persqueue/partition_read.cpp +++ b/ydb/core/persqueue/partition_read.cpp @@ -490,7 +490,7 @@ TReadAnswer TReadInfo::FormAnswer( } Y_ABORT_UNLESS(offset <= Offset); Y_ABORT_UNLESS(offset < Offset || partNo <= PartNo); - TKey key(TKeyPrefix::TypeData, TPartitionId(0), offset, partNo, count, internalPartsCount, false); + auto key = TKey::ForBody(TKeyPrefix::TypeData, TPartitionId(0), offset, partNo, count, internalPartsCount); ui64 firstHeaderOffset = GetFirstHeaderOffset(key, blobValue); for (TBlobIterator it(key, blobValue); it.IsValid() && !needStop; it.Next()) { TBatch batch = it.GetBatch(); diff --git a/ydb/core/persqueue/partition_write.cpp b/ydb/core/persqueue/partition_write.cpp index c590b3b6e4e..efe86e104da 100644 --- a/ydb/core/persqueue/partition_write.cpp +++ b/ydb/core/persqueue/partition_write.cpp @@ -419,7 +419,7 @@ void TPartition::SyncMemoryStateWithKVState(const TActorContext& ctx) { while (!CompactedKeys.empty()) { const auto& ck = CompactedKeys.front(); BodySize += ck.second; - Y_ABORT_UNLESS(!ck.first.IsHead()); + Y_ABORT_UNLESS(!ck.first.HasSuffix()); ui64 lastOffset = DataKeysBody.empty() ? 0 : (DataKeysBody.back().Key.GetOffset() + DataKeysBody.back().Key.GetCount()); Y_ABORT_UNLESS(lastOffset <= ck.first.GetOffset()); if (DataKeysBody.empty()) { @@ -1060,10 +1060,10 @@ void TPartition::AddCmdWrite(const std::optional<TPartitionedBlob::TFormedBlobIn const TActorContext& ctx) { auto write = request->Record.AddCmdWrite(); - write->SetKey(newWrite->Key.ToString()); + write->SetKey(newWrite->Key.Data(), newWrite->Key.Size()); write->SetValue(newWrite->Value); - Y_ABORT_UNLESS(!newWrite->Key.IsHead()); - auto channel = GetChannel(NextChannel(newWrite->Key.IsHead(), newWrite->Value.size())); + Y_ABORT_UNLESS(!newWrite->Key.HasSuffix()); + auto channel = GetChannel(NextChannel(newWrite->Key.HasSuffix(), newWrite->Value.size())); write->SetStorageChannel(channel); write->SetTactic(AppData(ctx)->PQConfig.GetTactic()); @@ -1115,10 +1115,10 @@ ui32 TPartition::RenameTmpCmdWrites(TEvKeyValue::TEvRequest* request) { ui32 curWrites = 0; for (ui32 i = 0; i < request->Record.CmdWriteSize(); ++i) { //change keys for yet to be writed KV pairs - TKey key(request->Record.GetCmdWrite(i).GetKey()); + auto key = TKey::FromString(request->Record.GetCmdWrite(i).GetKey()); if (key.GetType() == TKeyPrefix::TypeTmpData) { key.SetType(TKeyPrefix::TypeData); - request->Record.MutableCmdWrite(i)->SetKey(TString(key.Data(), key.Size())); + request->Record.MutableCmdWrite(i)->SetKey(key.Data(), key.Size()); ++curWrites; } } @@ -1259,7 +1259,7 @@ bool TPartition::ExecRequest(TWriteMsg& p, ProcessParameters& parameters, TEvKey auto oldCmdWrite = request->Record.GetCmdWrite(); request->Record.ClearCmdWrite(); for (ui32 i = 0; i < (ui32)oldCmdWrite.size(); ++i) { - TKey key(oldCmdWrite.Get(i).GetKey()); + auto key = TKey::FromString(oldCmdWrite.Get(i).GetKey()); if (key.GetType() != TKeyPrefix::TypeTmpData) { request->Record.AddCmdWrite()->CopyFrom(oldCmdWrite.Get(i)); } @@ -1386,7 +1386,12 @@ bool TPartition::ExecRequest(TWriteMsg& p, ProcessParameters& parameters, TEvKey std::pair<TKey, ui32> TPartition::GetNewWriteKeyImpl(bool headCleared, bool needCompaction, ui32 HeadSize) { - TKey key(TKeyPrefix::TypeData, Partition, NewHead.Offset, NewHead.PartNo, NewHead.GetCount(), NewHead.GetInternalPartsCount(), !needCompaction); + TKey key; + if (needCompaction) { + key = TKey::ForBody(TKeyPrefix::TypeData, Partition, NewHead.Offset, NewHead.PartNo, NewHead.GetCount(), NewHead.GetInternalPartsCount()); + } else { + key = TKey::ForHead(TKeyPrefix::TypeData, Partition, NewHead.Offset, NewHead.PartNo, NewHead.GetCount(), NewHead.GetInternalPartsCount()); + } if (NewHead.PackedSize > 0) DataKeysHead[TotalLevels - 1].AddKey(key, NewHead.PackedSize); @@ -1399,13 +1404,13 @@ std::pair<TKey, ui32> TPartition::GetNewWriteKeyImpl(bool headCleared, bool need DataKeysHead[i].Clear(); } if (!headCleared) { //compacted blob must contain both head and NewHead - key = TKey(TKeyPrefix::TypeData, Partition, Head.Offset, Head.PartNo, NewHead.GetCount() + Head.GetCount(), - Head.GetInternalPartsCount() + NewHead.GetInternalPartsCount(), false); - } //otherwise KV blob is not from head (!key.IsHead()) and contains only new data from NewHead + key = TKey::ForBody(TKeyPrefix::TypeData, Partition, Head.Offset, Head.PartNo, NewHead.GetCount() + Head.GetCount(), + Head.GetInternalPartsCount() + NewHead.GetInternalPartsCount()); + } //otherwise KV blob is not from head (!key.HasSuffix()) and contains only new data from NewHead res = std::make_pair(key, HeadSize + NewHead.PackedSize); } else { res = Compact(key, NewHead.PackedSize, headCleared); - Y_ABORT_UNLESS(res.first.IsHead());//may compact some KV blobs from head, but new KV blob is from head too + Y_ABORT_UNLESS(res.first.HasSuffix());//may compact some KV blobs from head, but new KV blob is from head too Y_ABORT_UNLESS(res.second >= NewHead.PackedSize); //at least new data must be writed } Y_ABORT_UNLESS(res.second <= MaxBlobSize); @@ -1455,7 +1460,7 @@ void TPartition::AddNewWriteBlob(std::pair<TKey, ui32>& res, TEvKeyValue::TEvReq Y_ABORT_UNLESS(res.second >= valueD.size()); - if (res.second > valueD.size() && res.first.IsHead()) { //change to real size if real packed size is smaller + if (res.second > valueD.size() && res.first.HasSuffix()) { //change to real size if real packed size is smaller Y_ABORT("Can't be here right now, only after merging of small batches"); @@ -1477,7 +1482,7 @@ void TPartition::AddNewWriteBlob(std::pair<TKey, ui32>& res, TEvKeyValue::TEvReq } } - Y_ABORT_UNLESS(res.second == valueD.size() || res.first.IsHead()); + Y_ABORT_UNLESS(res.second == valueD.size() || res.first.HasSuffix()); TClientBlob::CheckBlob(key, valueD); @@ -1485,12 +1490,12 @@ void TPartition::AddNewWriteBlob(std::pair<TKey, ui32>& res, TEvKeyValue::TEvReq write->SetKey(key.Data(), key.Size()); write->SetValue(valueD); - bool isInline = key.IsHead() && valueD.size() < MAX_INLINE_SIZE; + bool isInline = key.HasSuffix() && valueD.size() < MAX_INLINE_SIZE; if (isInline) write->SetStorageChannel(NKikimrClient::TKeyValueRequest::INLINE); else { - auto channel = GetChannel(NextChannel(key.IsHead(), valueD.size())); + auto channel = GetChannel(NextChannel(key.HasSuffix(), valueD.size())); write->SetStorageChannel(channel); write->SetTactic(AppData(ctx)->PQConfig.GetTactic()); } @@ -1499,7 +1504,7 @@ void TPartition::AddNewWriteBlob(std::pair<TKey, ui32>& res, TEvKeyValue::TEvReq const TKey& k = CompactedKeys.empty() ? key : CompactedKeys.front().first; ClearOldHead(k.GetOffset(), k.GetPartNo(), request); - if (!key.IsHead()) { + if (!key.HasSuffix()) { if (!DataKeysBody.empty() && CompactedKeys.empty()) { Y_ABORT_UNLESS(DataKeysBody.back().Key.GetOffset() + DataKeysBody.back().Key.GetCount() <= key.GetOffset(), "LAST KEY %s, HeadOffset %lu, NEWKEY %s", DataKeysBody.back().Key.ToString().c_str(), Head.Offset, key.ToString().c_str()); diff --git a/ydb/core/persqueue/read.h b/ydb/core/persqueue/read.h index d28d5f459f4..1891092e271 100644 --- a/ydb/core/persqueue/read.h +++ b/ydb/core/persqueue/read.h @@ -289,7 +289,7 @@ namespace NPQ { Y_ABORT_UNLESS((strKey.size() >= TKey::KeySize()) && (strKey.size() - TKey::KeySize() <= 1), "Unexpected key size: %" PRIu64 " (%s)", strKey.size(), strKey.data()); - TKey key(strKey); + auto key = TKey::FromString(strKey); const TString& value = cmd.GetValue(); kvReq.Partition = key.GetPartition(); diff --git a/ydb/core/persqueue/ut/internals_ut.cpp b/ydb/core/persqueue/ut/internals_ut.cpp index 0a2477bc3aa..1d3a273432e 100644 --- a/ydb/core/persqueue/ut/internals_ut.cpp +++ b/ydb/core/persqueue/ut/internals_ut.cpp @@ -299,26 +299,69 @@ Y_UNIT_TEST(TestToHex) { } Y_UNIT_TEST(StoreKeys) { - TKey keyOld(TKeyPrefix::TypeData, TPartitionId{9}, 8, 7, 6, 5, false); + // key for Body + auto keyOld = TKey::ForBody(TKeyPrefix::TypeData, TPartitionId{9}, 8, 7, 6, 5); UNIT_ASSERT_VALUES_EQUAL(keyOld.ToString(), "d0000000009_00000000000000000008_00007_0000000006_00005"); - TKey keyNew(TKeyPrefix::TypeData, TPartitionId{5, TWriteId{0, 1}, 9}, 8, 7, 6, 5, false); + auto keyNew = TKey::ForBody(TKeyPrefix::TypeData, TPartitionId{5, TWriteId{0, 1}, 9}, 8, 7, 6, 5); UNIT_ASSERT_VALUES_EQUAL(keyNew.ToString(), "D0000000009_00000000000000000008_00007_0000000006_00005"); keyNew.SetType(TKeyPrefix::TypeInfo); UNIT_ASSERT_VALUES_EQUAL(keyNew.ToString(), "M0000000009_00000000000000000008_00007_0000000006_00005"); + + // key for Head + auto keyHead = TKey::ForHead(TKeyPrefix::TypeData, TPartitionId{9}, 8, 7, 6, 5); + UNIT_ASSERT_VALUES_EQUAL(keyHead.ToString(), "d0000000009_00000000000000000008_00007_0000000006_00005|"); + + keyHead = TKey::FromKey(keyHead, TKeyPrefix::TypeData, TPartitionId{10}, 11); + UNIT_ASSERT_VALUES_EQUAL(keyHead.ToString(), "d0000000010_00000000000000000011_00007_0000000006_00005|"); + + // key for FastWrite + auto keyFastWrite = TKey::ForFastWrite(TKeyPrefix::TypeData, TPartitionId{9}, 8, 7, 6, 5); + UNIT_ASSERT_VALUES_EQUAL(keyFastWrite.ToString(), "d0000000009_00000000000000000008_00007_0000000006_00005?"); + + keyFastWrite = TKey::FromKey(keyFastWrite, TKeyPrefix::TypeData, TPartitionId{12}, 13); + UNIT_ASSERT_VALUES_EQUAL(keyFastWrite.ToString(), "d0000000012_00000000000000000013_00007_0000000006_00005?"); } Y_UNIT_TEST(RestoreKeys) { + // the key from the string { - TKey key("X0000000001_00000000000000000002_00003_0000000004_00005"); + auto key = TKey::FromString("X0000000001_00000000000000000002_00003_0000000004_00005"); UNIT_ASSERT(key.GetType() == TKeyPrefix::TypeTmpData); UNIT_ASSERT_VALUES_EQUAL(key.GetPartition().InternalPartitionId, 1); + UNIT_ASSERT_VALUES_EQUAL(key.GetOffset(), 2); + UNIT_ASSERT_VALUES_EQUAL(key.GetPartNo(), 3); + UNIT_ASSERT_VALUES_EQUAL(key.GetCount(), 4); + UNIT_ASSERT_VALUES_EQUAL(key.GetInternalPartsCount(), 5); + UNIT_ASSERT(!key.HasSuffix()); } + + // blob type { - TKey key("i0000000001_00000000000000000002_00003_0000000004_00005"); + auto key = TKey::FromString("i0000000001_00000000000000000002_00003_0000000004_00005"); UNIT_ASSERT(key.GetType() == TKeyPrefix::TypeMeta); - UNIT_ASSERT_VALUES_EQUAL(key.GetPartition().InternalPartitionId, 1); + } + + // the `partitionId` is being replaced + { + auto key = TKey::FromString("d0000000002_00000000000000000013_00007_0000000006_00005", TPartitionId{3}); + UNIT_ASSERT_VALUES_EQUAL(key.GetPartition().InternalPartitionId, 3); + UNIT_ASSERT(!key.HasSuffix()); + } + + // key for FastWrite + { + auto key = TKey::FromString("d0000000002_00000000000000000013_00007_0000000006_00005?", TPartitionId{4}); + UNIT_ASSERT_VALUES_EQUAL(key.GetPartition().InternalPartitionId, 4); + UNIT_ASSERT(key.HasSuffix()); + } + + // key for head + { + auto key = TKey::FromString("d0000000002_00000000000000000013_00007_0000000006_00005|", TPartitionId{8}); + UNIT_ASSERT_VALUES_EQUAL(key.GetPartition().InternalPartitionId, 8); + UNIT_ASSERT(key.HasSuffix()); } } diff --git a/ydb/core/persqueue/ut/partition_ut.cpp b/ydb/core/persqueue/ut/partition_ut.cpp index 1f67f68deed..234f734f54a 100644 --- a/ydb/core/persqueue/ut/partition_ut.cpp +++ b/ydb/core/persqueue/ut/partition_ut.cpp @@ -935,9 +935,16 @@ void TPartitionFixture::SendDataRangeResponse(ui32 partitionId, auto read = event->Record.AddReadRangeResult(); read->SetStatus(NKikimrProto::OK); auto pair = read->AddPair(); - NPQ::TKey key(NPQ::TKeyPrefix::TypeData, TPartitionId(partitionId), begin, 0, end - begin, 0, isHead); + + TKey key; + if (isHead) { + key = TKey::ForHead(TKeyPrefix::TypeData, TPartitionId(partitionId), begin, 0, end - begin, 0); + } else { + key = TKey::ForBody(TKeyPrefix::TypeData, TPartitionId(partitionId), begin, 0, end - begin, 0); + } + pair->SetStatus(NKikimrProto::OK); - pair->SetKey(key.ToString()); + pair->SetKey(key.Data(), key.Size()); pair->SetValueSize(684); pair->SetCreationUnixTime(TInstant::Now().Seconds()); diff --git a/ydb/core/persqueue/ut/pq_ut.cpp b/ydb/core/persqueue/ut/pq_ut.cpp index f4135596b4d..64c3a99f6b3 100644 --- a/ydb/core/persqueue/ut/pq_ut.cpp +++ b/ydb/core/persqueue/ut/pq_ut.cpp @@ -2036,12 +2036,12 @@ Y_UNIT_TEST(TestPQCacheSizeManagement) { Y_UNIT_TEST(TestOffsetEstimation) { std::deque<NPQ::TDataKey> container = { - {NPQ::TKey(NPQ::TKeyPrefix::EType::TypeNone, TPartitionId(0), 1, 0, 0, 0), 0, TInstant::Seconds(1), 10}, - {NPQ::TKey(NPQ::TKeyPrefix::EType::TypeNone, TPartitionId(0), 2, 0, 0, 0), 0, TInstant::Seconds(1), 10}, - {NPQ::TKey(NPQ::TKeyPrefix::EType::TypeNone, TPartitionId(0), 3, 0, 0, 0), 0, TInstant::Seconds(2), 10}, - {NPQ::TKey(NPQ::TKeyPrefix::EType::TypeNone, TPartitionId(0), 4, 0, 0, 0), 0, TInstant::Seconds(2), 10}, - {NPQ::TKey(NPQ::TKeyPrefix::EType::TypeNone, TPartitionId(0), 5, 0, 0, 0), 0, TInstant::Seconds(3), 10}, - {NPQ::TKey(NPQ::TKeyPrefix::EType::TypeNone, TPartitionId(0), 6, 0, 0, 0), 0, TInstant::Seconds(3), 10}, + {NPQ::TKey::ForBody(NPQ::TKeyPrefix::EType::TypeNone, TPartitionId(0), 1, 0, 0, 0), 0, TInstant::Seconds(1), 10}, + {NPQ::TKey::ForBody(NPQ::TKeyPrefix::EType::TypeNone, TPartitionId(0), 2, 0, 0, 0), 0, TInstant::Seconds(1), 10}, + {NPQ::TKey::ForBody(NPQ::TKeyPrefix::EType::TypeNone, TPartitionId(0), 3, 0, 0, 0), 0, TInstant::Seconds(2), 10}, + {NPQ::TKey::ForBody(NPQ::TKeyPrefix::EType::TypeNone, TPartitionId(0), 4, 0, 0, 0), 0, TInstant::Seconds(2), 10}, + {NPQ::TKey::ForBody(NPQ::TKeyPrefix::EType::TypeNone, TPartitionId(0), 5, 0, 0, 0), 0, TInstant::Seconds(3), 10}, + {NPQ::TKey::ForBody(NPQ::TKeyPrefix::EType::TypeNone, TPartitionId(0), 6, 0, 0, 0), 0, TInstant::Seconds(3), 10}, }; UNIT_ASSERT_EQUAL(NPQ::GetOffsetEstimate({}, TInstant::MilliSeconds(0), 9999), 9999); UNIT_ASSERT_EQUAL(NPQ::GetOffsetEstimate(container, TInstant::MilliSeconds(0), 9999), 1); diff --git a/ydb/core/persqueue/ut/user_action_processor_ut.cpp b/ydb/core/persqueue/ut/user_action_processor_ut.cpp index 0796c620f39..27ed3ed4ad6 100644 --- a/ydb/core/persqueue/ut/user_action_processor_ut.cpp +++ b/ydb/core/persqueue/ut/user_action_processor_ut.cpp @@ -633,7 +633,7 @@ void TUserActionProcessorFixture::SendDataRangeResponse(ui64 begin, ui64 end) auto pair = read->AddPair(); NPQ::TKey key(NPQ::TKeyPrefix::TypeData, 1, begin, 0, end - begin, 0); pair->SetStatus(NKikimrProto::OK); - pair->SetKey(key.ToString()); + pair->SetKey(key.Data(), key.Size()); //pair->SetValueSize(); pair->SetCreationUnixTime(0); |