diff options
author | azevaykin <[email protected]> | 2023-05-25 18:37:19 +0300 |
---|---|---|
committer | azevaykin <[email protected]> | 2023-05-25 18:37:19 +0300 |
commit | bf75c176de7cc511cc699e9570e1b95fe8a4bfa5 (patch) | |
tree | 97bd5abbcbe5c40f3cdd664613a6866f9fd69df6 | |
parent | 575a3eb78d1a0ea3c29c9fecbac5364b503c3ee7 (diff) |
Eliminate TString creation in Serialize

Внутри функции TBatch::Serialize лишнее создание строки
-rw-r--r-- | ydb/core/persqueue/blob.cpp | 21 | ||||
-rw-r--r-- | ydb/core/persqueue/blob.h | 4 | ||||
-rw-r--r-- | ydb/core/persqueue/partition_write.cpp | 4 | ||||
-rw-r--r-- | ydb/core/persqueue/ut/internals_ut.cpp | 6 |
4 files changed, 20 insertions, 15 deletions
diff --git a/ydb/core/persqueue/blob.cpp b/ydb/core/persqueue/blob.cpp index b35d4554490..859eafa43bc 100644 --- a/ydb/core/persqueue/blob.cpp +++ b/ydb/core/persqueue/blob.cpp @@ -72,7 +72,7 @@ void CheckBlob(const TKey& key, const TString& blob) } -void TClientBlob::Serialize(TBuffer& res) const +void TClientBlob::SerializeTo(TBuffer& res) const { ui32 totalSize = GetBlobSize(); ui32 psize = res.Size(); @@ -185,13 +185,16 @@ TClientBlob TClientBlob::Deserialize(const char* data, ui32 size) return TClientBlob(sourceId, seqNo, dt, std::move(partData), writeTimestamp, createTimestamp, us, partitionKey, explicitHashKey); } -TString TBatch::Serialize() { +void TBatch::SerializeTo(TString& res) { Y_VERIFY(Packed); - TString res; + ui16 sz = Header.ByteSize(); - bool rs = Header.SerializeToString(&res); + res.append((const char*)&sz, sizeof(ui16)); + + bool rs = Header.AppendToString(&res); Y_VERIFY(rs); - return TStringBuf((const char*)&sz, sizeof(ui16)) + res + PackedData; + + res += PackedData; } template <typename TCodec> @@ -371,7 +374,7 @@ void TBatch::Pack() { Header.SetFormat(NKikimrPQ::TBatchHeader::EUncompressed); res.Clear(); for (ui32 i = 0; i < Blobs.size(); ++i) { - Blobs[i].Serialize(res); + Blobs[i].SerializeTo(res); } PackedData = TString{res.Data(), res.Size()}; Header.SetPayloadSize(PackedData.size()); @@ -760,7 +763,7 @@ TString TPartitionedBlob::CompactHead(bool glueHead, THead& head, bool glueNewHe if (glueHead) { for (ui32 pp = 0; pp < head.Batches.size(); ++pp) { Y_VERIFY(head.Batches[pp].Packed); - valueD += head.Batches[pp].Serialize(); + head.Batches[pp].SerializeTo(valueD); } } if (glueNewHead) { @@ -774,7 +777,7 @@ TString TPartitionedBlob::CompactHead(bool glueHead, THead& head, bool glueNewHe b = &batch; } Y_VERIFY(b->Packed); - valueD += b->Serialize(); + b->SerializeTo(valueD); } } return valueD; @@ -815,7 +818,7 @@ std::pair<TKey, TString> TPartitionedBlob::Add(TClientBlob&& blob) Blobs.clear(); batch.Pack(); Y_VERIFY(batch.Packed); - valueD += batch.Serialize(); + batch.SerializeTo(valueD); } res.second = valueD; Y_VERIFY(res.second.size() <= MaxBlobSize && (res.second.size() + size + 1_MB > MaxBlobSize diff --git a/ydb/core/persqueue/blob.h b/ydb/core/persqueue/blob.h index 244349edfed..5f9ef43914f 100644 --- a/ydb/core/persqueue/blob.h +++ b/ydb/core/persqueue/blob.h @@ -92,7 +92,7 @@ struct TClientBlob { static const ui32 OVERHEAD = sizeof(ui32)/*totalSize*/ + sizeof(ui64)/*SeqNo*/ + sizeof(ui16) /*SourceId*/ + sizeof(ui64) /*WriteTimestamp*/ + sizeof(ui64) /*CreateTimestamp*/; - void Serialize(TBuffer& buffer) const; + void SerializeTo(TBuffer& buffer) const; static TClientBlob Deserialize(const char *data, ui32 size); }; @@ -189,7 +189,7 @@ struct TBatch { void UnpackToType0(TVector<TClientBlob> *result); void UnpackToType1(TVector<TClientBlob> *result); - TString Serialize(); + void SerializeTo(TString& res); ui32 FindPos(const ui64 offset, const ui16 partNo) const; diff --git a/ydb/core/persqueue/partition_write.cpp b/ydb/core/persqueue/partition_write.cpp index 12ed674b7da..c9910b8d4f9 100644 --- a/ydb/core/persqueue/partition_write.cpp +++ b/ydb/core/persqueue/partition_write.cpp @@ -1147,12 +1147,12 @@ void TPartition::AddNewWriteBlob(std::pair<TKey, ui32>& res, TEvKeyValue::TEvReq Y_VERIFY(Head.Batches[pp].GetPartNo() == key.GetPartNo()); for (; pp < Head.Batches.size(); ++pp) { //TODO - merge small batches here Y_VERIFY(Head.Batches[pp].Packed); - valueD += Head.Batches[pp].Serialize(); + Head.Batches[pp].SerializeTo(valueD); } } for (auto& b : NewHead.Batches) { Y_VERIFY(b.Packed); - valueD += b.Serialize(); + b.SerializeTo(valueD); } Y_VERIFY(res.second >= valueD.size()); diff --git a/ydb/core/persqueue/ut/internals_ut.cpp b/ydb/core/persqueue/ut/internals_ut.cpp index ce426c4ea38..22db0517609 100644 --- a/ydb/core/persqueue/ut/internals_ut.cpp +++ b/ydb/core/persqueue/ut/internals_ut.cpp @@ -52,7 +52,8 @@ void Test(bool headCompacted, ui32 parts, ui32 partSize, ui32 leftInHead) UNIT_ASSERT(head.Batches.back().Header.GetFormat() == NKikimrPQ::TBatchHeader::ECompressed); head.Batches.back().Unpack(); head.Batches.back().Pack(); - TString str = head.Batches.back().Serialize(); + TString str; + head.Batches.back().SerializeTo(str); auto header = ExtractHeader(str.c_str(), str.size()); TBatch batch(header, str.c_str() + header.ByteSize() + sizeof(ui16)); batch.Unpack(); @@ -186,7 +187,8 @@ Y_UNIT_TEST(TestBatchPacking) { batch.Unpack(); batch.Pack(); UNIT_ASSERT(batch.PackedData == s); - TString str = batch.Serialize(); + TString str; + batch.SerializeTo(str); auto header = ExtractHeader(str.c_str(), str.size()); TBatch batch2(header, str.c_str() + header.ByteSize() + sizeof(ui16)); batch2.Unpack(); |