summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorazevaykin <[email protected]>2023-05-25 18:37:19 +0300
committerazevaykin <[email protected]>2023-05-25 18:37:19 +0300
commitbf75c176de7cc511cc699e9570e1b95fe8a4bfa5 (patch)
tree97bd5abbcbe5c40f3cdd664613a6866f9fd69df6
parent575a3eb78d1a0ea3c29c9fecbac5364b503c3ee7 (diff)
Eliminate TString creation in Serialize
![](https://arcanum.s3.mds.yandex.net/files/azevaykin/EzeZvnh6Lzkmdlh2CvNBF) Внутри функции TBatch::Serialize лишнее создание строки
-rw-r--r--ydb/core/persqueue/blob.cpp21
-rw-r--r--ydb/core/persqueue/blob.h4
-rw-r--r--ydb/core/persqueue/partition_write.cpp4
-rw-r--r--ydb/core/persqueue/ut/internals_ut.cpp6
4 files changed, 20 insertions, 15 deletions
diff --git a/ydb/core/persqueue/blob.cpp b/ydb/core/persqueue/blob.cpp
index b35d4554490..859eafa43bc 100644
--- a/ydb/core/persqueue/blob.cpp
+++ b/ydb/core/persqueue/blob.cpp
@@ -72,7 +72,7 @@ void CheckBlob(const TKey& key, const TString& blob)
}
-void TClientBlob::Serialize(TBuffer& res) const
+void TClientBlob::SerializeTo(TBuffer& res) const
{
ui32 totalSize = GetBlobSize();
ui32 psize = res.Size();
@@ -185,13 +185,16 @@ TClientBlob TClientBlob::Deserialize(const char* data, ui32 size)
return TClientBlob(sourceId, seqNo, dt, std::move(partData), writeTimestamp, createTimestamp, us, partitionKey, explicitHashKey);
}
-TString TBatch::Serialize() {
+void TBatch::SerializeTo(TString& res) {
Y_VERIFY(Packed);
- TString res;
+
ui16 sz = Header.ByteSize();
- bool rs = Header.SerializeToString(&res);
+ res.append((const char*)&sz, sizeof(ui16));
+
+ bool rs = Header.AppendToString(&res);
Y_VERIFY(rs);
- return TStringBuf((const char*)&sz, sizeof(ui16)) + res + PackedData;
+
+ res += PackedData;
}
template <typename TCodec>
@@ -371,7 +374,7 @@ void TBatch::Pack() {
Header.SetFormat(NKikimrPQ::TBatchHeader::EUncompressed);
res.Clear();
for (ui32 i = 0; i < Blobs.size(); ++i) {
- Blobs[i].Serialize(res);
+ Blobs[i].SerializeTo(res);
}
PackedData = TString{res.Data(), res.Size()};
Header.SetPayloadSize(PackedData.size());
@@ -760,7 +763,7 @@ TString TPartitionedBlob::CompactHead(bool glueHead, THead& head, bool glueNewHe
if (glueHead) {
for (ui32 pp = 0; pp < head.Batches.size(); ++pp) {
Y_VERIFY(head.Batches[pp].Packed);
- valueD += head.Batches[pp].Serialize();
+ head.Batches[pp].SerializeTo(valueD);
}
}
if (glueNewHead) {
@@ -774,7 +777,7 @@ TString TPartitionedBlob::CompactHead(bool glueHead, THead& head, bool glueNewHe
b = &batch;
}
Y_VERIFY(b->Packed);
- valueD += b->Serialize();
+ b->SerializeTo(valueD);
}
}
return valueD;
@@ -815,7 +818,7 @@ std::pair<TKey, TString> TPartitionedBlob::Add(TClientBlob&& blob)
Blobs.clear();
batch.Pack();
Y_VERIFY(batch.Packed);
- valueD += batch.Serialize();
+ batch.SerializeTo(valueD);
}
res.second = valueD;
Y_VERIFY(res.second.size() <= MaxBlobSize && (res.second.size() + size + 1_MB > MaxBlobSize
diff --git a/ydb/core/persqueue/blob.h b/ydb/core/persqueue/blob.h
index 244349edfed..5f9ef43914f 100644
--- a/ydb/core/persqueue/blob.h
+++ b/ydb/core/persqueue/blob.h
@@ -92,7 +92,7 @@ struct TClientBlob {
static const ui32 OVERHEAD = sizeof(ui32)/*totalSize*/ + sizeof(ui64)/*SeqNo*/ + sizeof(ui16) /*SourceId*/ + sizeof(ui64) /*WriteTimestamp*/ + sizeof(ui64) /*CreateTimestamp*/;
- void Serialize(TBuffer& buffer) const;
+ void SerializeTo(TBuffer& buffer) const;
static TClientBlob Deserialize(const char *data, ui32 size);
};
@@ -189,7 +189,7 @@ struct TBatch {
void UnpackToType0(TVector<TClientBlob> *result);
void UnpackToType1(TVector<TClientBlob> *result);
- TString Serialize();
+ void SerializeTo(TString& res);
ui32 FindPos(const ui64 offset, const ui16 partNo) const;
diff --git a/ydb/core/persqueue/partition_write.cpp b/ydb/core/persqueue/partition_write.cpp
index 12ed674b7da..c9910b8d4f9 100644
--- a/ydb/core/persqueue/partition_write.cpp
+++ b/ydb/core/persqueue/partition_write.cpp
@@ -1147,12 +1147,12 @@ void TPartition::AddNewWriteBlob(std::pair<TKey, ui32>& res, TEvKeyValue::TEvReq
Y_VERIFY(Head.Batches[pp].GetPartNo() == key.GetPartNo());
for (; pp < Head.Batches.size(); ++pp) { //TODO - merge small batches here
Y_VERIFY(Head.Batches[pp].Packed);
- valueD += Head.Batches[pp].Serialize();
+ Head.Batches[pp].SerializeTo(valueD);
}
}
for (auto& b : NewHead.Batches) {
Y_VERIFY(b.Packed);
- valueD += b.Serialize();
+ b.SerializeTo(valueD);
}
Y_VERIFY(res.second >= valueD.size());
diff --git a/ydb/core/persqueue/ut/internals_ut.cpp b/ydb/core/persqueue/ut/internals_ut.cpp
index ce426c4ea38..22db0517609 100644
--- a/ydb/core/persqueue/ut/internals_ut.cpp
+++ b/ydb/core/persqueue/ut/internals_ut.cpp
@@ -52,7 +52,8 @@ void Test(bool headCompacted, ui32 parts, ui32 partSize, ui32 leftInHead)
UNIT_ASSERT(head.Batches.back().Header.GetFormat() == NKikimrPQ::TBatchHeader::ECompressed);
head.Batches.back().Unpack();
head.Batches.back().Pack();
- TString str = head.Batches.back().Serialize();
+ TString str;
+ head.Batches.back().SerializeTo(str);
auto header = ExtractHeader(str.c_str(), str.size());
TBatch batch(header, str.c_str() + header.ByteSize() + sizeof(ui16));
batch.Unpack();
@@ -186,7 +187,8 @@ Y_UNIT_TEST(TestBatchPacking) {
batch.Unpack();
batch.Pack();
UNIT_ASSERT(batch.PackedData == s);
- TString str = batch.Serialize();
+ TString str;
+ batch.SerializeTo(str);
auto header = ExtractHeader(str.c_str(), str.size());
TBatch batch2(header, str.c_str() + header.ByteSize() + sizeof(ui16));
batch2.Unpack();