aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAlek5andr-Kotov <akotov@ydb.tech>2025-03-27 17:57:36 +0300
committerGitHub <noreply@github.com>2025-03-27 14:57:36 +0000
commit47cc8b277d0188a6edc257e188843affd231d156 (patch)
treef29f439dc897377ea086794eff050baa96bb72fa
parent6b8ffd922403171f9968f00cc9d44857a33af2b7 (diff)
downloadydb-47cc8b277d0188a6edc257e188843affd231d156.tar.gz
Keys for blocks in the PQ tablet (#16337)
-rw-r--r--ydb/core/persqueue/blob.cpp12
-rw-r--r--ydb/core/persqueue/cache_eviction.h6
-rw-r--r--ydb/core/persqueue/key.cpp42
-rw-r--r--ydb/core/persqueue/key.h151
-rw-r--r--ydb/core/persqueue/partition.cpp2
-rw-r--r--ydb/core/persqueue/partition_init.cpp12
-rw-r--r--ydb/core/persqueue/partition_read.cpp2
-rw-r--r--ydb/core/persqueue/partition_write.cpp39
-rw-r--r--ydb/core/persqueue/read.h2
-rw-r--r--ydb/core/persqueue/ut/internals_ut.cpp53
-rw-r--r--ydb/core/persqueue/ut/partition_ut.cpp11
-rw-r--r--ydb/core/persqueue/ut/pq_ut.cpp12
-rw-r--r--ydb/core/persqueue/ut/user_action_processor_ut.cpp2
13 files changed, 232 insertions, 114 deletions
diff --git a/ydb/core/persqueue/blob.cpp b/ydb/core/persqueue/blob.cpp
index 084fbd9c6b9..54f7dd53bf4 100644
--- a/ydb/core/persqueue/blob.cpp
+++ b/ydb/core/persqueue/blob.cpp
@@ -883,8 +883,8 @@ auto TPartitionedBlob::CreateFormedBlob(ui32 size, bool useRename) -> std::optio
Y_ABORT_UNLESS(NewHead.GetNextOffset() >= (GlueHead ? Head.Offset : NewHead.Offset));
- TKey tmpKey(TKeyPrefix::TypeTmpData, Partition, StartOffset, StartPartNo, count, InternalPartsCount, false);
- TKey dataKey(TKeyPrefix::TypeData, Partition, StartOffset, StartPartNo, count, InternalPartsCount, false);
+ auto tmpKey = TKey::ForBody(TKeyPrefix::TypeTmpData, Partition, StartOffset, StartPartNo, count, InternalPartsCount);
+ auto dataKey = TKey::ForBody(TKeyPrefix::TypeData, Partition, StartOffset, StartPartNo, count, InternalPartsCount);
StartOffset = Offset;
StartPartNo = NextPartNo;
@@ -955,13 +955,7 @@ auto TPartitionedBlob::Add(const TKey& oldKey, ui32 size) -> std::optional<TForm
NewHead.Offset = StartOffset;
}
- TKey newKey(TKeyPrefix::TypeData,
- Partition,
- StartOffset,
- oldKey.GetPartNo(),
- oldKey.GetCount(),
- oldKey.GetInternalPartsCount(),
- oldKey.IsHead());
+ auto newKey = TKey::FromKey(oldKey, TKeyPrefix::TypeData, Partition, StartOffset);
FormedBlobs.emplace_back(oldKey, newKey, size);
diff --git a/ydb/core/persqueue/cache_eviction.h b/ydb/core/persqueue/cache_eviction.h
index 767ffa0bf6d..09aeda4e1b5 100644
--- a/ydb/core/persqueue/cache_eviction.h
+++ b/ydb/core/persqueue/cache_eviction.h
@@ -106,7 +106,7 @@ namespace NKikimr::NPQ {
for (auto& blob : Blobs) {
if (blob.Value.empty()) {
// add reading command
- TKey key(TKeyPrefix::TypeData, Partition, blob.Offset, blob.PartNo, blob.Count, blob.InternalPartsCount);
+ auto key = TKey::ForBody(TKeyPrefix::TypeData, Partition, blob.Offset, blob.PartNo, blob.Count, blob.InternalPartsCount);
auto read = request->Record.AddCmdRead();
read->SetKey(key.Data(), key.Size());
}
@@ -142,7 +142,7 @@ namespace NKikimr::NPQ {
}
void Verify(const TRequestedBlob& blob) const {
- TKey key(TKeyPrefix::TypeData, TPartitionId(0), blob.Offset, blob.PartNo, blob.Count, blob.InternalPartsCount, false);
+ auto key = TKey::ForBody(TKeyPrefix::TypeData, TPartitionId(0), blob.Offset, blob.PartNo, blob.Count, blob.InternalPartsCount);
Y_ABORT_UNLESS(blob.Value.size() == blob.Size);
TClientBlob::CheckBlob(key, blob.Value);
}
@@ -324,7 +324,7 @@ namespace NKikimr::NPQ {
partitionId.InternalPartitionId = partitionId.OriginalPartitionId;
return {partitionId, 0, 0, 0, 0};
} else {
- TKey key(s);
+ auto key = TKey::FromString(s);
return {key.GetPartition(), key.GetOffset(), key.GetPartNo(), key.GetCount(), key.GetInternalPartsCount()};
}
}
diff --git a/ydb/core/persqueue/key.cpp b/ydb/core/persqueue/key.cpp
index 88319f2f391..e3794e8c62a 100644
--- a/ydb/core/persqueue/key.cpp
+++ b/ydb/core/persqueue/key.cpp
@@ -10,7 +10,7 @@ std::pair<TKeyPrefix, TKeyPrefix> MakeKeyPrefixRange(TKeyPrefix::EType type, con
return {std::move(from), std::move(to)};
}
-TKey MakeKeyFromString(const TString& s, const TPartitionId& partition)
+TKey TKey::FromString(const TString& s, const TPartitionId& partition)
{
TKey t(s);
return TKey(t.GetType(),
@@ -19,7 +19,45 @@ TKey MakeKeyFromString(const TString& s, const TPartitionId& partition)
t.GetPartNo(),
t.GetCount(),
t.GetInternalPartsCount(),
- t.IsHead());
+ t.GetSuffix());
+}
+
+TKey TKey::ForBody(EType type,
+ const TPartitionId& partition,
+ const ui64 offset,
+ const ui16 partNo,
+ const ui32 count,
+ const ui16 internalPartsCount)
+{
+ return {type, partition, offset, partNo, count, internalPartsCount, Nothing()};
+}
+
+TKey TKey::ForHead(EType type,
+ const TPartitionId& partition,
+ const ui64 offset,
+ const ui16 partNo,
+ const ui32 count,
+ const ui16 internalPartsCount)
+{
+ return {type, partition, offset, partNo, count, internalPartsCount, '|'};
+}
+
+TKey TKey::ForFastWrite(EType type,
+ const TPartitionId& partition,
+ const ui64 offset,
+ const ui16 partNo,
+ const ui32 count,
+ const ui16 internalPartsCount)
+{
+ return {type, partition, offset, partNo, count, internalPartsCount, '?'};
+}
+
+TKey TKey::FromKey(const TKey& k,
+ EType type,
+ const TPartitionId& partition,
+ ui64 offset)
+{
+ return {type, partition, offset, k.GetPartNo(), k.GetCount(), k.GetInternalPartsCount(), k.GetSuffix()};
}
void TKeyPrefix::SetTypeImpl(EType type, bool isServicePartition)
diff --git a/ydb/core/persqueue/key.h b/ydb/core/persqueue/key.h
index f9cddc6a3ca..c303fd948f6 100644
--- a/ydb/core/persqueue/key.h
+++ b/ydb/core/persqueue/key.h
@@ -11,7 +11,7 @@
namespace NKikimr {
namespace NPQ {
-// {char type; ui32 partiton; (char mark)}
+// {char type; ui32 partition; (char mark)}
class TKeyPrefix : public TBuffer
{
public:
@@ -54,7 +54,7 @@ public:
{}
TString ToString() const {
- return TString(Data(), Size());
+ return {Data(), Size()};
}
bool Marked(EMark mark) {
@@ -134,7 +134,7 @@ private:
std::pair<TKeyPrefix, TKeyPrefix> MakeKeyPrefixRange(TKeyPrefix::EType type, const TPartitionId& partition);
-// {char type; ui32 partiton; ui64 offset; ui16 partNo; ui32 count, ui16 internalPartsCount}
+// {char type; ui32 partition; ui64 offset; ui16 partNo; ui32 count, ui16 internalPartsCount}
// offset, partNo - index of first rec
// count - diff of last record offset and first record offset in blob
// internalPartsCount - number of internal parts
@@ -144,105 +144,90 @@ std::pair<TKeyPrefix, TKeyPrefix> MakeKeyPrefixRange(TKeyPrefix::EType type, con
class TKey : public TKeyPrefix
{
public:
- TKey(EType type, const TPartitionId& partition, const ui64 offset, const ui16 partNo, const ui32 count, const ui16 internalPartsCount, const bool isHead = false)
- : TKeyPrefix(type, partition)
- , Offset(offset)
- , Count(count)
- , PartNo(partNo)
- , InternalPartsCount(internalPartsCount)
- {
- Resize(KeySize());
- *(PtrOffset() - 1) = *(PtrCount() - 1) = *(PtrPartNo() - 1) = *(PtrInternalPartsCount() - 1) = '_';
- SetOffset(offset);
- SetPartNo(partNo);
- SetCount(count);
- SetInternalPartsCount(InternalPartsCount);
- SetHead(isHead);
- }
+ static TKey ForBody(EType type,
+ const TPartitionId& partition,
+ const ui64 offset,
+ const ui16 partNo,
+ const ui32 count,
+ const ui16 internalPartsCount);
+ static TKey ForHead(EType type,
+ const TPartitionId& partition,
+ const ui64 offset,
+ const ui16 partNo,
+ const ui32 count,
+ const ui16 internalPartsCount);
+ static TKey ForFastWrite(EType type,
+ const TPartitionId& partition,
+ const ui64 offset,
+ const ui16 partNo,
+ const ui32 count,
+ const ui16 internalPartsCount);
+
+ static TKey FromString(const TString& s) { return {s}; }
+ static TKey FromString(const TString& s, const TPartitionId& partition);
+
+ static TKey FromKey(const TKey& k,
+ EType type,
+ const TPartitionId& partitionId,
+ ui64 offset);
+
+ TKey()
+ : TKey(TypeNone, TPartitionId(0), 0, 0, 0, 0, false)
+ {}
TKey(const TKey& key)
- : TKey(key.GetType(), key.GetPartition(), key.Offset, key.PartNo, key.Count, key.InternalPartsCount, key.IsHead())
+ : TKey(key.GetType(), key.GetPartition(), key.Offset, key.PartNo, key.Count, key.InternalPartsCount, key.GetSuffix())
{
}
- TKey(const TString& data)
- {
- Assign(data.data(), data.size());
- Y_ABORT_UNLESS(data.size() == KeySize() + IsHead());
- Y_ABORT_UNLESS(*(PtrOffset() - 1) == '_');
- Y_ABORT_UNLESS(*(PtrCount() - 1) == '_');
- Y_ABORT_UNLESS(*(PtrPartNo() - 1) == '_');
- Y_ABORT_UNLESS(*(PtrInternalPartsCount() - 1) == '_');
-
- ParsePartition();
- ParseOffset();
- ParseCount();
- ParsePartNo();
- ParseInternalPartsCount();
- }
-
- TKey()
- : TKey(TypeNone, TPartitionId(0), 0, 0, 0, 0)
- {}
-
virtual ~TKey()
{}
- TString ToString() const {
- return TString(Data(), Size());
- }
-
- void SetHead(const bool isHead) {
- Resize(KeySize() + isHead);
- if (isHead)
- Data()[KeySize()] = '|';
- }
-
void SetOffset(const ui64 offset) {
- Y_ABORT_UNLESS(Size() == KeySize() + IsHead());
+ Y_ABORT_UNLESS(Size() == KeySize() + HasSuffix());
Offset = offset;
memcpy(PtrOffset(), Sprintf("%.20" PRIu64, offset).data(), 20);
}
ui64 GetOffset() const {
- Y_ABORT_UNLESS(Size() == KeySize() + IsHead());
+ Y_ABORT_UNLESS(Size() == KeySize() + HasSuffix());
return Offset;
}
void SetCount(const ui32 count) {
- Y_ABORT_UNLESS(Size() == KeySize() + IsHead());
+ Y_ABORT_UNLESS(Size() == KeySize() + HasSuffix());
Count = count;
memcpy(PtrCount(), Sprintf("%.10" PRIu32, count).data(), 10);
}
ui32 GetCount() const {
- Y_ABORT_UNLESS(Size() == KeySize() + IsHead());
+ Y_ABORT_UNLESS(Size() == KeySize() + HasSuffix());
return Count;
}
void SetPartNo(const ui16 partNo) {
- Y_ABORT_UNLESS(Size() == KeySize() + IsHead());
+ Y_ABORT_UNLESS(Size() == KeySize() + HasSuffix());
PartNo = partNo;
memcpy(PtrPartNo(), Sprintf("%.5" PRIu16, partNo).data(), 5);
}
ui16 GetPartNo() const {
- Y_ABORT_UNLESS(Size() == KeySize() + IsHead());
+ Y_ABORT_UNLESS(Size() == KeySize() + HasSuffix());
return PartNo;
}
void SetInternalPartsCount(const ui16 internalPartsCount) {
- Y_ABORT_UNLESS(Size() == KeySize() + IsHead());
+ Y_ABORT_UNLESS(Size() == KeySize() + HasSuffix());
InternalPartsCount = internalPartsCount;
memcpy(PtrInternalPartsCount(), Sprintf("%.5" PRIu16, internalPartsCount).data(), 5);
}
ui16 GetInternalPartsCount() const {
- Y_ABORT_UNLESS(Size() == KeySize() + IsHead());
+ Y_ABORT_UNLESS(Size() == KeySize() + HasSuffix());
return InternalPartsCount;
}
- bool IsHead() const {
+ bool HasSuffix() const {
return Size() == KeySize() + 1;
}
@@ -257,6 +242,38 @@ public:
}
private:
+ TKey(EType type, const TPartitionId& partition, const ui64 offset, const ui16 partNo, const ui32 count, const ui16 internalPartsCount, const TMaybe<char> suffix)
+ : TKeyPrefix(type, partition)
+ , Offset(offset)
+ , Count(count)
+ , PartNo(partNo)
+ , InternalPartsCount(internalPartsCount)
+ {
+ Resize(KeySize());
+ *(PtrOffset() - 1) = *(PtrCount() - 1) = *(PtrPartNo() - 1) = *(PtrInternalPartsCount() - 1) = '_';
+ SetOffset(offset);
+ SetPartNo(partNo);
+ SetCount(count);
+ SetInternalPartsCount(InternalPartsCount);
+ SetSuffix(suffix);
+ }
+
+ TKey(const TString& data)
+ {
+ Assign(data.data(), data.size());
+ Y_ABORT_UNLESS(data.size() == KeySize() + HasSuffix());
+ Y_ABORT_UNLESS(*(PtrOffset() - 1) == '_');
+ Y_ABORT_UNLESS(*(PtrCount() - 1) == '_');
+ Y_ABORT_UNLESS(*(PtrPartNo() - 1) == '_');
+ Y_ABORT_UNLESS(*(PtrInternalPartsCount() - 1) == '_');
+
+ ParsePartition();
+ ParseOffset();
+ ParseCount();
+ ParsePartNo();
+ ParseInternalPartsCount();
+ }
+
char* PtrOffset() { return Data() + UnmarkedSize() + 1; }
char* PtrPartNo() { return PtrOffset() + 20 + 1; }
char* PtrCount() { return PtrPartNo() + 5 + 1; }
@@ -287,14 +304,28 @@ private:
InternalPartsCount = FromString<ui16>(TStringBuf{PtrInternalPartsCount(), 5});
}
+ TMaybe<char> GetSuffix() const
+ {
+ if (HasSuffix()) {
+ return Data()[KeySize()];
+ }
+ return Nothing();
+ }
+
+ void SetSuffix(TMaybe<char> suffix)
+ {
+ Resize(KeySize() + suffix.Defined());
+ if (suffix.Defined()) {
+ Data()[KeySize()] = *suffix;
+ }
+ }
+
ui64 Offset;
ui32 Count;
ui16 PartNo;
ui16 InternalPartsCount;
};
-TKey MakeKeyFromString(const TString& s, const TPartitionId& partition);
-
inline
TString GetTxKey(ui64 txId)
{
diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp
index 9a37c987e1c..73bbd97f007 100644
--- a/ydb/core/persqueue/partition.cpp
+++ b/ydb/core/persqueue/partition.cpp
@@ -919,7 +919,7 @@ TInstant TPartition::GetWriteTimeEstimate(ui64 offset) const {
Y_ABORT_UNLESS(it != container.begin(),
"Tablet %lu StartOffset %lu, HeadOffset %lu, offset %lu, containter size %lu, first-elem: %s",
TabletID, StartOffset, Head.Offset, offset, container.size(),
- container.front().Key.ToString().c_str());
+ container.front().Key.ToString().data());
Y_ABORT_UNLESS(it == container.end() ||
it->Key.GetOffset() > offset ||
it->Key.GetOffset() == offset && it->Key.GetPartNo() > 0);
diff --git a/ydb/core/persqueue/partition_init.cpp b/ydb/core/persqueue/partition_init.cpp
index 1e134088279..863b85facba 100644
--- a/ydb/core/persqueue/partition_init.cpp
+++ b/ydb/core/persqueue/partition_init.cpp
@@ -516,7 +516,7 @@ THashSet<TString> FilterBlobsMetaData(const NKikimrClient::TKeyValueResponse::TR
for (ui32 i = 0; i < range.PairSize(); ++i) {
const auto& pair = range.GetPair(i);
Y_ABORT_UNLESS(pair.GetStatus() == NKikimrProto::OK); //this is readrange without keys, only OK could be here
- source.push_back(MakeKeyFromString(pair.GetKey(), partitionId));
+ source.push_back(TKey::FromString(pair.GetKey(), partitionId));
}
auto isKeyLess = [](const TKey& lhs, const TKey& rhs) {
@@ -590,7 +590,7 @@ void TInitDataRangeStep::FillBlobsMetaData(const NKikimrClient::TKeyValueRespons
for (ui32 i = 0; i < range.PairSize(); ++i) {
const auto& pair = range.GetPair(i);
Y_ABORT_UNLESS(pair.GetStatus() == NKikimrProto::OK); //this is readrange without keys, only OK could be here
- TKey k = MakeKeyFromString(pair.GetKey(), PartitionId());
+ auto k = TKey::FromString(pair.GetKey(), PartitionId());
if (!actualKeys.contains(pair.GetKey())) {
Partition()->DeletedKeys.emplace_back(k.ToString());
continue;
@@ -612,7 +612,7 @@ void TInitDataRangeStep::FillBlobsMetaData(const NKikimrClient::TKeyValueRespons
Y_ABORT_UNLESS(k.GetOffset() >= endOffset);
endOffset = k.GetOffset() + k.GetCount();
//at this point EndOffset > StartOffset
- if (!k.IsHead()) //head.Size will be filled after read or head blobs
+ if (!k.HasSuffix()) //head.Size will be filled after read or head blobs
bodySize += pair.GetValueSize();
PQ_LOG_D("Got data offset " << k.GetOffset() << " count " << k.GetCount() << " size " << pair.GetValueSize()
@@ -639,7 +639,7 @@ void TInitDataRangeStep::FormHeadAndProceed() {
head.Offset = endOffset;
head.PartNo = 0;
- while (dataKeysBody.size() > 0 && dataKeysBody.back().Key.IsHead()) {
+ while (dataKeysBody.size() > 0 && dataKeysBody.back().Key.HasSuffix()) {
Y_ABORT_UNLESS(dataKeysBody.back().Key.GetOffset() + dataKeysBody.back().Key.GetCount() == head.Offset); //no gaps in head allowed
headKeys.push_front(dataKeysBody.back());
head.Offset = dataKeysBody.back().Key.GetOffset();
@@ -647,7 +647,7 @@ void TInitDataRangeStep::FormHeadAndProceed() {
dataKeysBody.pop_back();
}
for (const auto& p : dataKeysBody) {
- Y_ABORT_UNLESS(!p.Key.IsHead());
+ Y_ABORT_UNLESS(!p.Key.HasSuffix());
}
Y_ABORT_UNLESS(headKeys.empty() || head.Offset == headKeys.front().Key.GetOffset() && head.PartNo == headKeys.front().Key.GetPartNo());
@@ -707,7 +707,7 @@ void TInitDataStep::Handle(TEvKeyValue::TEvResponse::TPtr &ev, const TActorConte
switch(read.GetStatus()) {
case NKikimrProto::OK: {
const TKey& key = headKeys[i].Key;
- Y_ABORT_UNLESS(key.IsHead());
+ Y_ABORT_UNLESS(key.HasSuffix());
ui32 size = headKeys[i].Size;
ui64 offset = key.GetOffset();
diff --git a/ydb/core/persqueue/partition_read.cpp b/ydb/core/persqueue/partition_read.cpp
index 4161ea3fd99..faf618176a8 100644
--- a/ydb/core/persqueue/partition_read.cpp
+++ b/ydb/core/persqueue/partition_read.cpp
@@ -490,7 +490,7 @@ TReadAnswer TReadInfo::FormAnswer(
}
Y_ABORT_UNLESS(offset <= Offset);
Y_ABORT_UNLESS(offset < Offset || partNo <= PartNo);
- TKey key(TKeyPrefix::TypeData, TPartitionId(0), offset, partNo, count, internalPartsCount, false);
+ auto key = TKey::ForBody(TKeyPrefix::TypeData, TPartitionId(0), offset, partNo, count, internalPartsCount);
ui64 firstHeaderOffset = GetFirstHeaderOffset(key, blobValue);
for (TBlobIterator it(key, blobValue); it.IsValid() && !needStop; it.Next()) {
TBatch batch = it.GetBatch();
diff --git a/ydb/core/persqueue/partition_write.cpp b/ydb/core/persqueue/partition_write.cpp
index c590b3b6e4e..efe86e104da 100644
--- a/ydb/core/persqueue/partition_write.cpp
+++ b/ydb/core/persqueue/partition_write.cpp
@@ -419,7 +419,7 @@ void TPartition::SyncMemoryStateWithKVState(const TActorContext& ctx) {
while (!CompactedKeys.empty()) {
const auto& ck = CompactedKeys.front();
BodySize += ck.second;
- Y_ABORT_UNLESS(!ck.first.IsHead());
+ Y_ABORT_UNLESS(!ck.first.HasSuffix());
ui64 lastOffset = DataKeysBody.empty() ? 0 : (DataKeysBody.back().Key.GetOffset() + DataKeysBody.back().Key.GetCount());
Y_ABORT_UNLESS(lastOffset <= ck.first.GetOffset());
if (DataKeysBody.empty()) {
@@ -1060,10 +1060,10 @@ void TPartition::AddCmdWrite(const std::optional<TPartitionedBlob::TFormedBlobIn
const TActorContext& ctx)
{
auto write = request->Record.AddCmdWrite();
- write->SetKey(newWrite->Key.ToString());
+ write->SetKey(newWrite->Key.Data(), newWrite->Key.Size());
write->SetValue(newWrite->Value);
- Y_ABORT_UNLESS(!newWrite->Key.IsHead());
- auto channel = GetChannel(NextChannel(newWrite->Key.IsHead(), newWrite->Value.size()));
+ Y_ABORT_UNLESS(!newWrite->Key.HasSuffix());
+ auto channel = GetChannel(NextChannel(newWrite->Key.HasSuffix(), newWrite->Value.size()));
write->SetStorageChannel(channel);
write->SetTactic(AppData(ctx)->PQConfig.GetTactic());
@@ -1115,10 +1115,10 @@ ui32 TPartition::RenameTmpCmdWrites(TEvKeyValue::TEvRequest* request)
{
ui32 curWrites = 0;
for (ui32 i = 0; i < request->Record.CmdWriteSize(); ++i) { //change keys for yet to be writed KV pairs
- TKey key(request->Record.GetCmdWrite(i).GetKey());
+ auto key = TKey::FromString(request->Record.GetCmdWrite(i).GetKey());
if (key.GetType() == TKeyPrefix::TypeTmpData) {
key.SetType(TKeyPrefix::TypeData);
- request->Record.MutableCmdWrite(i)->SetKey(TString(key.Data(), key.Size()));
+ request->Record.MutableCmdWrite(i)->SetKey(key.Data(), key.Size());
++curWrites;
}
}
@@ -1259,7 +1259,7 @@ bool TPartition::ExecRequest(TWriteMsg& p, ProcessParameters& parameters, TEvKey
auto oldCmdWrite = request->Record.GetCmdWrite();
request->Record.ClearCmdWrite();
for (ui32 i = 0; i < (ui32)oldCmdWrite.size(); ++i) {
- TKey key(oldCmdWrite.Get(i).GetKey());
+ auto key = TKey::FromString(oldCmdWrite.Get(i).GetKey());
if (key.GetType() != TKeyPrefix::TypeTmpData) {
request->Record.AddCmdWrite()->CopyFrom(oldCmdWrite.Get(i));
}
@@ -1386,7 +1386,12 @@ bool TPartition::ExecRequest(TWriteMsg& p, ProcessParameters& parameters, TEvKey
std::pair<TKey, ui32> TPartition::GetNewWriteKeyImpl(bool headCleared, bool needCompaction, ui32 HeadSize)
{
- TKey key(TKeyPrefix::TypeData, Partition, NewHead.Offset, NewHead.PartNo, NewHead.GetCount(), NewHead.GetInternalPartsCount(), !needCompaction);
+ TKey key;
+ if (needCompaction) {
+ key = TKey::ForBody(TKeyPrefix::TypeData, Partition, NewHead.Offset, NewHead.PartNo, NewHead.GetCount(), NewHead.GetInternalPartsCount());
+ } else {
+ key = TKey::ForHead(TKeyPrefix::TypeData, Partition, NewHead.Offset, NewHead.PartNo, NewHead.GetCount(), NewHead.GetInternalPartsCount());
+ }
if (NewHead.PackedSize > 0)
DataKeysHead[TotalLevels - 1].AddKey(key, NewHead.PackedSize);
@@ -1399,13 +1404,13 @@ std::pair<TKey, ui32> TPartition::GetNewWriteKeyImpl(bool headCleared, bool need
DataKeysHead[i].Clear();
}
if (!headCleared) { //compacted blob must contain both head and NewHead
- key = TKey(TKeyPrefix::TypeData, Partition, Head.Offset, Head.PartNo, NewHead.GetCount() + Head.GetCount(),
- Head.GetInternalPartsCount() + NewHead.GetInternalPartsCount(), false);
- } //otherwise KV blob is not from head (!key.IsHead()) and contains only new data from NewHead
+ key = TKey::ForBody(TKeyPrefix::TypeData, Partition, Head.Offset, Head.PartNo, NewHead.GetCount() + Head.GetCount(),
+ Head.GetInternalPartsCount() + NewHead.GetInternalPartsCount());
+ } //otherwise KV blob is not from head (!key.HasSuffix()) and contains only new data from NewHead
res = std::make_pair(key, HeadSize + NewHead.PackedSize);
} else {
res = Compact(key, NewHead.PackedSize, headCleared);
- Y_ABORT_UNLESS(res.first.IsHead());//may compact some KV blobs from head, but new KV blob is from head too
+ Y_ABORT_UNLESS(res.first.HasSuffix());//may compact some KV blobs from head, but new KV blob is from head too
Y_ABORT_UNLESS(res.second >= NewHead.PackedSize); //at least new data must be writed
}
Y_ABORT_UNLESS(res.second <= MaxBlobSize);
@@ -1455,7 +1460,7 @@ void TPartition::AddNewWriteBlob(std::pair<TKey, ui32>& res, TEvKeyValue::TEvReq
Y_ABORT_UNLESS(res.second >= valueD.size());
- if (res.second > valueD.size() && res.first.IsHead()) { //change to real size if real packed size is smaller
+ if (res.second > valueD.size() && res.first.HasSuffix()) { //change to real size if real packed size is smaller
Y_ABORT("Can't be here right now, only after merging of small batches");
@@ -1477,7 +1482,7 @@ void TPartition::AddNewWriteBlob(std::pair<TKey, ui32>& res, TEvKeyValue::TEvReq
}
}
- Y_ABORT_UNLESS(res.second == valueD.size() || res.first.IsHead());
+ Y_ABORT_UNLESS(res.second == valueD.size() || res.first.HasSuffix());
TClientBlob::CheckBlob(key, valueD);
@@ -1485,12 +1490,12 @@ void TPartition::AddNewWriteBlob(std::pair<TKey, ui32>& res, TEvKeyValue::TEvReq
write->SetKey(key.Data(), key.Size());
write->SetValue(valueD);
- bool isInline = key.IsHead() && valueD.size() < MAX_INLINE_SIZE;
+ bool isInline = key.HasSuffix() && valueD.size() < MAX_INLINE_SIZE;
if (isInline)
write->SetStorageChannel(NKikimrClient::TKeyValueRequest::INLINE);
else {
- auto channel = GetChannel(NextChannel(key.IsHead(), valueD.size()));
+ auto channel = GetChannel(NextChannel(key.HasSuffix(), valueD.size()));
write->SetStorageChannel(channel);
write->SetTactic(AppData(ctx)->PQConfig.GetTactic());
}
@@ -1499,7 +1504,7 @@ void TPartition::AddNewWriteBlob(std::pair<TKey, ui32>& res, TEvKeyValue::TEvReq
const TKey& k = CompactedKeys.empty() ? key : CompactedKeys.front().first;
ClearOldHead(k.GetOffset(), k.GetPartNo(), request);
- if (!key.IsHead()) {
+ if (!key.HasSuffix()) {
if (!DataKeysBody.empty() && CompactedKeys.empty()) {
Y_ABORT_UNLESS(DataKeysBody.back().Key.GetOffset() + DataKeysBody.back().Key.GetCount() <= key.GetOffset(),
"LAST KEY %s, HeadOffset %lu, NEWKEY %s", DataKeysBody.back().Key.ToString().c_str(), Head.Offset, key.ToString().c_str());
diff --git a/ydb/core/persqueue/read.h b/ydb/core/persqueue/read.h
index d28d5f459f4..1891092e271 100644
--- a/ydb/core/persqueue/read.h
+++ b/ydb/core/persqueue/read.h
@@ -289,7 +289,7 @@ namespace NPQ {
Y_ABORT_UNLESS((strKey.size() >= TKey::KeySize()) && (strKey.size() - TKey::KeySize() <= 1),
"Unexpected key size: %" PRIu64 " (%s)",
strKey.size(), strKey.data());
- TKey key(strKey);
+ auto key = TKey::FromString(strKey);
const TString& value = cmd.GetValue();
kvReq.Partition = key.GetPartition();
diff --git a/ydb/core/persqueue/ut/internals_ut.cpp b/ydb/core/persqueue/ut/internals_ut.cpp
index 0a2477bc3aa..1d3a273432e 100644
--- a/ydb/core/persqueue/ut/internals_ut.cpp
+++ b/ydb/core/persqueue/ut/internals_ut.cpp
@@ -299,26 +299,69 @@ Y_UNIT_TEST(TestToHex) {
}
Y_UNIT_TEST(StoreKeys) {
- TKey keyOld(TKeyPrefix::TypeData, TPartitionId{9}, 8, 7, 6, 5, false);
+ // key for Body
+ auto keyOld = TKey::ForBody(TKeyPrefix::TypeData, TPartitionId{9}, 8, 7, 6, 5);
UNIT_ASSERT_VALUES_EQUAL(keyOld.ToString(), "d0000000009_00000000000000000008_00007_0000000006_00005");
- TKey keyNew(TKeyPrefix::TypeData, TPartitionId{5, TWriteId{0, 1}, 9}, 8, 7, 6, 5, false);
+ auto keyNew = TKey::ForBody(TKeyPrefix::TypeData, TPartitionId{5, TWriteId{0, 1}, 9}, 8, 7, 6, 5);
UNIT_ASSERT_VALUES_EQUAL(keyNew.ToString(), "D0000000009_00000000000000000008_00007_0000000006_00005");
keyNew.SetType(TKeyPrefix::TypeInfo);
UNIT_ASSERT_VALUES_EQUAL(keyNew.ToString(), "M0000000009_00000000000000000008_00007_0000000006_00005");
+
+ // key for Head
+ auto keyHead = TKey::ForHead(TKeyPrefix::TypeData, TPartitionId{9}, 8, 7, 6, 5);
+ UNIT_ASSERT_VALUES_EQUAL(keyHead.ToString(), "d0000000009_00000000000000000008_00007_0000000006_00005|");
+
+ keyHead = TKey::FromKey(keyHead, TKeyPrefix::TypeData, TPartitionId{10}, 11);
+ UNIT_ASSERT_VALUES_EQUAL(keyHead.ToString(), "d0000000010_00000000000000000011_00007_0000000006_00005|");
+
+ // key for FastWrite
+ auto keyFastWrite = TKey::ForFastWrite(TKeyPrefix::TypeData, TPartitionId{9}, 8, 7, 6, 5);
+ UNIT_ASSERT_VALUES_EQUAL(keyFastWrite.ToString(), "d0000000009_00000000000000000008_00007_0000000006_00005?");
+
+ keyFastWrite = TKey::FromKey(keyFastWrite, TKeyPrefix::TypeData, TPartitionId{12}, 13);
+ UNIT_ASSERT_VALUES_EQUAL(keyFastWrite.ToString(), "d0000000012_00000000000000000013_00007_0000000006_00005?");
}
Y_UNIT_TEST(RestoreKeys) {
+ // the key from the string
{
- TKey key("X0000000001_00000000000000000002_00003_0000000004_00005");
+ auto key = TKey::FromString("X0000000001_00000000000000000002_00003_0000000004_00005");
UNIT_ASSERT(key.GetType() == TKeyPrefix::TypeTmpData);
UNIT_ASSERT_VALUES_EQUAL(key.GetPartition().InternalPartitionId, 1);
+ UNIT_ASSERT_VALUES_EQUAL(key.GetOffset(), 2);
+ UNIT_ASSERT_VALUES_EQUAL(key.GetPartNo(), 3);
+ UNIT_ASSERT_VALUES_EQUAL(key.GetCount(), 4);
+ UNIT_ASSERT_VALUES_EQUAL(key.GetInternalPartsCount(), 5);
+ UNIT_ASSERT(!key.HasSuffix());
}
+
+ // blob type
{
- TKey key("i0000000001_00000000000000000002_00003_0000000004_00005");
+ auto key = TKey::FromString("i0000000001_00000000000000000002_00003_0000000004_00005");
UNIT_ASSERT(key.GetType() == TKeyPrefix::TypeMeta);
- UNIT_ASSERT_VALUES_EQUAL(key.GetPartition().InternalPartitionId, 1);
+ }
+
+ // the `partitionId` is being replaced
+ {
+ auto key = TKey::FromString("d0000000002_00000000000000000013_00007_0000000006_00005", TPartitionId{3});
+ UNIT_ASSERT_VALUES_EQUAL(key.GetPartition().InternalPartitionId, 3);
+ UNIT_ASSERT(!key.HasSuffix());
+ }
+
+ // key for FastWrite
+ {
+ auto key = TKey::FromString("d0000000002_00000000000000000013_00007_0000000006_00005?", TPartitionId{4});
+ UNIT_ASSERT_VALUES_EQUAL(key.GetPartition().InternalPartitionId, 4);
+ UNIT_ASSERT(key.HasSuffix());
+ }
+
+ // key for head
+ {
+ auto key = TKey::FromString("d0000000002_00000000000000000013_00007_0000000006_00005|", TPartitionId{8});
+ UNIT_ASSERT_VALUES_EQUAL(key.GetPartition().InternalPartitionId, 8);
+ UNIT_ASSERT(key.HasSuffix());
}
}
diff --git a/ydb/core/persqueue/ut/partition_ut.cpp b/ydb/core/persqueue/ut/partition_ut.cpp
index 1f67f68deed..234f734f54a 100644
--- a/ydb/core/persqueue/ut/partition_ut.cpp
+++ b/ydb/core/persqueue/ut/partition_ut.cpp
@@ -935,9 +935,16 @@ void TPartitionFixture::SendDataRangeResponse(ui32 partitionId,
auto read = event->Record.AddReadRangeResult();
read->SetStatus(NKikimrProto::OK);
auto pair = read->AddPair();
- NPQ::TKey key(NPQ::TKeyPrefix::TypeData, TPartitionId(partitionId), begin, 0, end - begin, 0, isHead);
+
+ TKey key;
+ if (isHead) {
+ key = TKey::ForHead(TKeyPrefix::TypeData, TPartitionId(partitionId), begin, 0, end - begin, 0);
+ } else {
+ key = TKey::ForBody(TKeyPrefix::TypeData, TPartitionId(partitionId), begin, 0, end - begin, 0);
+ }
+
pair->SetStatus(NKikimrProto::OK);
- pair->SetKey(key.ToString());
+ pair->SetKey(key.Data(), key.Size());
pair->SetValueSize(684);
pair->SetCreationUnixTime(TInstant::Now().Seconds());
diff --git a/ydb/core/persqueue/ut/pq_ut.cpp b/ydb/core/persqueue/ut/pq_ut.cpp
index f4135596b4d..64c3a99f6b3 100644
--- a/ydb/core/persqueue/ut/pq_ut.cpp
+++ b/ydb/core/persqueue/ut/pq_ut.cpp
@@ -2036,12 +2036,12 @@ Y_UNIT_TEST(TestPQCacheSizeManagement) {
Y_UNIT_TEST(TestOffsetEstimation) {
std::deque<NPQ::TDataKey> container = {
- {NPQ::TKey(NPQ::TKeyPrefix::EType::TypeNone, TPartitionId(0), 1, 0, 0, 0), 0, TInstant::Seconds(1), 10},
- {NPQ::TKey(NPQ::TKeyPrefix::EType::TypeNone, TPartitionId(0), 2, 0, 0, 0), 0, TInstant::Seconds(1), 10},
- {NPQ::TKey(NPQ::TKeyPrefix::EType::TypeNone, TPartitionId(0), 3, 0, 0, 0), 0, TInstant::Seconds(2), 10},
- {NPQ::TKey(NPQ::TKeyPrefix::EType::TypeNone, TPartitionId(0), 4, 0, 0, 0), 0, TInstant::Seconds(2), 10},
- {NPQ::TKey(NPQ::TKeyPrefix::EType::TypeNone, TPartitionId(0), 5, 0, 0, 0), 0, TInstant::Seconds(3), 10},
- {NPQ::TKey(NPQ::TKeyPrefix::EType::TypeNone, TPartitionId(0), 6, 0, 0, 0), 0, TInstant::Seconds(3), 10},
+ {NPQ::TKey::ForBody(NPQ::TKeyPrefix::EType::TypeNone, TPartitionId(0), 1, 0, 0, 0), 0, TInstant::Seconds(1), 10},
+ {NPQ::TKey::ForBody(NPQ::TKeyPrefix::EType::TypeNone, TPartitionId(0), 2, 0, 0, 0), 0, TInstant::Seconds(1), 10},
+ {NPQ::TKey::ForBody(NPQ::TKeyPrefix::EType::TypeNone, TPartitionId(0), 3, 0, 0, 0), 0, TInstant::Seconds(2), 10},
+ {NPQ::TKey::ForBody(NPQ::TKeyPrefix::EType::TypeNone, TPartitionId(0), 4, 0, 0, 0), 0, TInstant::Seconds(2), 10},
+ {NPQ::TKey::ForBody(NPQ::TKeyPrefix::EType::TypeNone, TPartitionId(0), 5, 0, 0, 0), 0, TInstant::Seconds(3), 10},
+ {NPQ::TKey::ForBody(NPQ::TKeyPrefix::EType::TypeNone, TPartitionId(0), 6, 0, 0, 0), 0, TInstant::Seconds(3), 10},
};
UNIT_ASSERT_EQUAL(NPQ::GetOffsetEstimate({}, TInstant::MilliSeconds(0), 9999), 9999);
UNIT_ASSERT_EQUAL(NPQ::GetOffsetEstimate(container, TInstant::MilliSeconds(0), 9999), 1);
diff --git a/ydb/core/persqueue/ut/user_action_processor_ut.cpp b/ydb/core/persqueue/ut/user_action_processor_ut.cpp
index 0796c620f39..27ed3ed4ad6 100644
--- a/ydb/core/persqueue/ut/user_action_processor_ut.cpp
+++ b/ydb/core/persqueue/ut/user_action_processor_ut.cpp
@@ -633,7 +633,7 @@ void TUserActionProcessorFixture::SendDataRangeResponse(ui64 begin, ui64 end)
auto pair = read->AddPair();
NPQ::TKey key(NPQ::TKeyPrefix::TypeData, 1, begin, 0, end - begin, 0);
pair->SetStatus(NKikimrProto::OK);
- pair->SetKey(key.ToString());
+ pair->SetKey(key.Data(), key.Size());
//pair->SetValueSize();
pair->SetCreationUnixTime(0);