diff options
author | azevaykin <azevaykin@yandex-team.com> | 2023-05-30 10:57:38 +0300 |
---|---|---|
committer | azevaykin <azevaykin@yandex-team.com> | 2023-05-30 10:57:38 +0300 |
commit | 3dab27170ff8ed8b6ce93179da5f767ee3586787 (patch) | |
tree | d1000ed9f460f4180fd161a9a5c027ba0d070c7c | |
parent | 18887976a96b91c933fbd89663fa916d9105e7e3 (diff) | |
download | ydb-3dab27170ff8ed8b6ce93179da5f767ee3586787.tar.gz |
Remove temporary strings from TBatch::Pack
Убрал в TBatch::Pack в двух местах промежуточные буфера:
https://a.yandex-team.ru/review/3930290/files/456ffebaded149bc22c6d146b5c58ea66b9e0b0f#file-ydb/core/persqueue/blob.cpp:L220
https://a.yandex-team.ru/review/3930290/files/89d42b1e1e0526c2920cd9ca91b39aee4c2f1490#file-ydb/core/persqueue/blob.cpp:L258
Было 7 млрд samples:

Стало 3 млрд samples:

-rw-r--r-- | ydb/core/persqueue/blob.cpp | 118 | ||||
-rw-r--r-- | ydb/core/persqueue/blob.h | 12 | ||||
-rw-r--r-- | ydb/core/persqueue/pq_l2_cache.h | 2 | ||||
-rw-r--r-- | ydb/core/persqueue/type_codecs.h | 2 | ||||
-rw-r--r-- | ydb/core/persqueue/type_codecs_defs.h | 13 | ||||
-rw-r--r-- | ydb/core/persqueue/type_coders.h | 69 | ||||
-rw-r--r-- | ydb/core/persqueue/ut/internals_ut.cpp | 4 | ||||
-rw-r--r-- | ydb/core/persqueue/ut/type_codecs_ut.cpp | 16 | ||||
-rw-r--r-- | ydb/core/util/blob_data_stream.h | 205 |
9 files changed, 141 insertions, 300 deletions
diff --git a/ydb/core/persqueue/blob.cpp b/ydb/core/persqueue/blob.cpp index 60f2f96708..f925152e19 100644 --- a/ydb/core/persqueue/blob.cpp +++ b/ydb/core/persqueue/blob.cpp @@ -185,7 +185,7 @@ TClientBlob TClientBlob::Deserialize(const char* data, ui32 size) return TClientBlob(sourceId, seqNo, dt, std::move(partData), writeTimestamp, createTimestamp, us, partitionKey, explicitHashKey); } -void TBatch::SerializeTo(TString& res) { +void TBatch::SerializeTo(TString& res) const{ Y_VERIFY(Packed); ui16 sz = Header.ByteSize(); @@ -194,30 +194,43 @@ void TBatch::SerializeTo(TString& res) { bool rs = Header.AppendToString(&res); Y_VERIFY(rs); - res += PackedData; + res.append(PackedData.data(), PackedData.size()); } template <typename TCodec> -TAutoPtr<NScheme::IChunkCoder> MakeChunk(TAutoPtr<TFlatBlobDataOutputStream>& output) +TAutoPtr<NScheme::IChunkCoder> MakeChunk(TBuffer& output) { - output.Reset(new TFlatBlobDataOutputStream); TCodec codec; - return codec.MakeChunk(output.Get()); + return codec.MakeChunk(output); } -void OutputChunk(TAutoPtr<NScheme::IChunkCoder> chunk, TAutoPtr<TFlatBlobDataOutputStream> output, TBuffer& res) +const ui32 CHUNK_SIZE_PLACEMENT = 0xCCCCCCCC; +ui32 WriteTemporaryChunkSize(TBuffer & output) { - chunk->Seal(); - ui32 size = output->CurrentBuffer().size(); - res.Append((const char*)&size, sizeof(ui32)); - res.Append(output->CurrentBuffer().data(), output->CurrentBuffer().size()); + ui32 sizeOffset = output.Size(); + + ui32 sizePlacement = CHUNK_SIZE_PLACEMENT; + output.Append((const char*)&sizePlacement, sizeof(ui32)); + + return sizeOffset; +} + +void WriteActualChunkSize(TBuffer& output, ui32 sizeOffset) +{ + ui32 currSize = output.size(); + Y_VERIFY(currSize >= sizeOffset + sizeof(ui32)); + ui32 size = currSize - sizeOffset - sizeof(ui32); + + ui32* sizePlacement = (ui32*)(output.data() + sizeOffset); + Y_VERIFY_DEBUG(*sizePlacement == CHUNK_SIZE_PLACEMENT); + *sizePlacement = size; } void TBatch::Pack() { if (Packed) return; Packed = true; - TBuffer res; + PackedData.Clear(); bool hasUncompressed = false; bool hasKinesis = false; @@ -255,11 +268,11 @@ void TBatch::Pack() { for (ui32 i = 0; i < Blobs.size(); ++i) { pos[start[reorderMap[TStringBuf(Blobs[i].SourceId)]]++] = i; } - TAutoPtr<TFlatBlobDataOutputStream> output; //output order { - auto chunk = MakeChunk<NScheme::TVarIntCodec<ui32,false>>(output); + ui32 sizeOffset = WriteTemporaryChunkSize(PackedData); + auto chunk = MakeChunk<NScheme::TVarIntCodec<ui32, false>>(PackedData); for (const auto& p : pos) { chunk->AddData((const char*)&p, sizeof(p)); } @@ -268,39 +281,47 @@ void TBatch::Pack() { for (const auto& p : start) { chunk->AddData((const char*)&p, sizeof(p)); } - OutputChunk(chunk, output, res); + chunk->Seal(); + WriteActualChunkSize(PackedData, sizeOffset); } //output SourceId { - auto chunk = MakeChunk<NScheme::TVarLenCodec<false>>(output); + ui32 sizeOffset = WriteTemporaryChunkSize(PackedData); + auto chunk = MakeChunk<NScheme::TVarLenCodec<false>>(PackedData); for (auto it = reorderMap.begin(); it != reorderMap.end(); ++it) { chunk->AddData(it->first.data(), it->first.size()); } - OutputChunk(chunk, output, res); + chunk->Seal(); + WriteActualChunkSize(PackedData, sizeOffset); } //output SeqNo { - auto chunk = MakeChunk<NScheme::TDeltaVarIntCodec<ui64, false>>(output); + ui32 sizeOffset = WriteTemporaryChunkSize(PackedData); + auto chunk = MakeChunk<NScheme::TDeltaVarIntCodec<ui64, false>>(PackedData); for (const auto& p : pos) { chunk->AddData((const char*)&Blobs[p].SeqNo, sizeof(ui64)); } - OutputChunk(chunk, output, res); + chunk->Seal(); + WriteActualChunkSize(PackedData, sizeOffset); } //output Data { - auto chunk = MakeChunk<NScheme::TVarLenCodec<false>>(output); + ui32 sizeOffset = WriteTemporaryChunkSize(PackedData); + auto chunk = MakeChunk<NScheme::TVarLenCodec<false>>(PackedData); for (const auto& p : pos) { chunk->AddData(Blobs[p].Data.data(), Blobs[p].Data.size()); } - OutputChunk(chunk, output, res); + chunk->Seal(); + WriteActualChunkSize(PackedData, sizeOffset); } //output PartData::Pos + payload { - auto chunk = MakeChunk<NScheme::TVarIntCodec<ui32, false>>(output); + ui32 sizeOffset = WriteTemporaryChunkSize(PackedData); + auto chunk = MakeChunk<NScheme::TVarIntCodec<ui32, false>>(PackedData); ui32 cnt = 0; for (ui32 i = 0; i < Blobs.size(); ++i) { if (Blobs[i].PartData) @@ -317,68 +338,79 @@ void TBatch::Pack() { chunk->AddData((const char*)&Blobs[i].PartData->TotalSize, sizeof(ui32)); } } - OutputChunk(chunk, output, res); + chunk->Seal(); + WriteActualChunkSize(PackedData, sizeOffset); } //output Wtime { - auto chunk = MakeChunk<NScheme::TDeltaVarIntCodec<ui64, false>>(output); + ui32 sizeOffset = WriteTemporaryChunkSize(PackedData); + auto chunk = MakeChunk<NScheme::TDeltaVarIntCodec<ui64, false>>(PackedData); for (ui32 i = 0; i < Blobs.size(); ++i) { ui64 writeTimestampMs = Blobs[i].WriteTimestamp.MilliSeconds(); chunk->AddData((const char*)&writeTimestampMs, sizeof(ui64)); } - OutputChunk(chunk, output, res); + chunk->Seal(); + WriteActualChunkSize(PackedData, sizeOffset); } if (hasKinesis) { { - auto chunk = MakeChunk<NScheme::TVarLenCodec<false>>(output); + ui32 sizeOffset = WriteTemporaryChunkSize(PackedData); + auto chunk = MakeChunk<NScheme::TVarLenCodec<false>>(PackedData); for (const auto &p : pos) { chunk->AddData(Blobs[p].PartitionKey.data(), Blobs[p].PartitionKey.size()); } - OutputChunk(chunk, output, res); + chunk->Seal(); + WriteActualChunkSize(PackedData, sizeOffset); } { - auto chunk = MakeChunk<NScheme::TVarLenCodec<false>>(output); + ui32 sizeOffset = WriteTemporaryChunkSize(PackedData); + auto chunk = MakeChunk<NScheme::TVarLenCodec<false>>(PackedData); for (const auto &p : pos) { chunk->AddData(Blobs[p].ExplicitHashKey.data(), Blobs[p].ExplicitHashKey.size()); } - OutputChunk(chunk, output, res); + chunk->Seal(); + WriteActualChunkSize(PackedData, sizeOffset); } } //output Ctime { - auto chunk = MakeChunk<NScheme::TDeltaVarIntCodec<ui64, false>>(output); + ui32 sizeOffset = WriteTemporaryChunkSize(PackedData); + auto chunk = MakeChunk<NScheme::TDeltaVarIntCodec<ui64, false>>(PackedData); for (ui32 i = 0; i < Blobs.size(); ++i) { ui64 createTimestampMs = Blobs[i].CreateTimestamp.MilliSeconds(); chunk->AddData((const char*)&createTimestampMs, sizeof(ui64)); } - OutputChunk(chunk, output, res); + chunk->Seal(); + WriteActualChunkSize(PackedData, sizeOffset); } //output Uncompressed if (hasUncompressed) { - auto chunk = MakeChunk<NScheme::TVarIntCodec<ui32, false>>(output); + ui32 sizeOffset = WriteTemporaryChunkSize(PackedData); + auto chunk = MakeChunk<NScheme::TVarIntCodec<ui32, false>>(PackedData); for (ui32 i = 0; i < Blobs.size(); ++i) { chunk->AddData((const char*)&Blobs[i].UncompressedSize, sizeof(ui32)); } - OutputChunk(chunk, output, res); + chunk->Seal(); + WriteActualChunkSize(PackedData, sizeOffset); } - PackedData = TString{res.Data(), res.Size()}; Header.SetPayloadSize(PackedData.size()); if (GetPackedSize() > GetUnpackedSize() + GetMaxHeaderSize()) { //packing is not effective, write as-is Header.SetFormat(NKikimrPQ::TBatchHeader::EUncompressed); - res.Clear(); + PackedData.Clear(); for (ui32 i = 0; i < Blobs.size(); ++i) { - Blobs[i].SerializeTo(res); + Blobs[i].SerializeTo(PackedData); } - PackedData = TString{res.Data(), res.Size()}; Header.SetPayloadSize(PackedData.size()); } + + TVector<TClientBlob> tmp; Blobs.swap(tmp); InternalPartsPos.resize(0); @@ -397,13 +429,13 @@ void TBatch::Unpack() { InternalPartsPos.push_back(i); } Y_VERIFY(InternalPartsPos.size() == GetInternalPartsCount()); - TString tmp; - tmp.swap(PackedData); + + PackedData.Clear(); } void TBatch::UnpackTo(TVector<TClientBlob> *blobs) { - Y_VERIFY(!PackedData.empty()); + Y_VERIFY(PackedData.size()); auto type = Header.GetFormat(); switch (type) { case NKikimrPQ::TBatchHeader::EUncompressed: @@ -427,7 +459,7 @@ NScheme::TDataRef GetChunk(const char*& data, const char *end) void TBatch::UnpackToType1(TVector<TClientBlob> *blobs) { Y_VERIFY(Header.GetFormat() == NKikimrPQ::TBatchHeader::ECompressed); - Y_VERIFY(!PackedData.empty()); + Y_VERIFY(PackedData.size()); ui32 totalBlobs = Header.GetCount() + Header.GetInternalPartsCount(); ui32 partsSize = 0; TVector<ui32> end; @@ -587,13 +619,13 @@ void TBatch::UnpackToType1(TVector<TClientBlob> *blobs) { void TBatch::UnpackToType0(TVector<TClientBlob> *blobs) { Y_VERIFY(Header.GetFormat() == NKikimrPQ::TBatchHeader::EUncompressed); - Y_VERIFY(!PackedData.empty()); + Y_VERIFY(PackedData.size()); ui32 shift = 0; for (ui32 i = 0; i < GetCount() + GetInternalPartsCount(); ++i) { Y_VERIFY(shift < PackedData.size()); - blobs->push_back(TClientBlob::Deserialize(PackedData.c_str() + shift, PackedData.size() - shift)); - shift += *(ui32*)(PackedData.c_str() + shift); + blobs->push_back(TClientBlob::Deserialize(PackedData.data() + shift, PackedData.size() - shift)); + shift += *(ui32*)(PackedData.data() + shift); } Y_VERIFY(shift == PackedData.size()); } diff --git a/ydb/core/persqueue/blob.h b/ydb/core/persqueue/blob.h index 6573ee4720..307cca7d0e 100644 --- a/ydb/core/persqueue/blob.h +++ b/ydb/core/persqueue/blob.h @@ -3,6 +3,7 @@ #include "key.h" #include <util/datetime/base.h> +#include <util/generic/size_literals.h> #include <util/generic/maybe.h> #include <util/generic/vector.h> @@ -97,6 +98,7 @@ struct TClientBlob { }; +static constexpr const ui32 MAX_BLOB_SIZE = 8_MB; //TBatch represents several clientBlobs. Can be in unpacked state(TVector<TClientBlob> blobs) //or packed(PackedData) @@ -112,14 +114,17 @@ struct TBatch { TVector<TClientBlob> Blobs; TVector<ui32> InternalPartsPos; NKikimrPQ::TBatchHeader Header; - TString PackedData; + TBuffer PackedData; TBatch() : Packed(false) - {} + { + PackedData.Reserve(8_MB); + } TBatch(const ui64 offset, const ui16 partNo, const TVector<TClientBlob>& blobs) : Packed(false) { + PackedData.Reserve(8_MB); Header.SetOffset(offset); Header.SetPartNo(partNo); Header.SetUnpackedSize(0); @@ -133,6 +138,7 @@ struct TBatch { TBatch(const ui64 offset, const ui16 partNo, const std::deque<TClientBlob>& blobs) : Packed(false) { + PackedData.Reserve(8_MB); Header.SetOffset(offset); Header.SetPartNo(partNo); Header.SetUnpackedSize(0); @@ -189,7 +195,7 @@ struct TBatch { void UnpackToType0(TVector<TClientBlob> *result); void UnpackToType1(TVector<TClientBlob> *result); - void SerializeTo(TString& res); + void SerializeTo(TString& res) const; ui32 FindPos(const ui64 offset, const ui16 partNo) const; diff --git a/ydb/core/persqueue/pq_l2_cache.h b/ydb/core/persqueue/pq_l2_cache.h index b76a2017d0..05f71baaa0 100644 --- a/ydb/core/persqueue/pq_l2_cache.h +++ b/ydb/core/persqueue/pq_l2_cache.h @@ -12,8 +12,6 @@ namespace NKikimr { namespace NPQ { -static const ui32 MAX_BLOB_SIZE = 8_MB; - struct TL2Counters { ::NMonitoring::TDynamicCounters::TCounterPtr TotalSize; ::NMonitoring::TDynamicCounters::TCounterPtr TotalCount; diff --git a/ydb/core/persqueue/type_codecs.h b/ydb/core/persqueue/type_codecs.h index 788e0fce0e..9450c8409a 100644 --- a/ydb/core/persqueue/type_codecs.h +++ b/ydb/core/persqueue/type_codecs.h @@ -25,7 +25,7 @@ public: return TCoder::Sig(); } - TAutoPtr<IChunkCoder> MakeChunk(TFlatBlobDataOutputStream* output) const override { + TAutoPtr<IChunkCoder> MakeChunk(TBuffer& output) const override { return new TCoder(output); } diff --git a/ydb/core/persqueue/type_codecs_defs.h b/ydb/core/persqueue/type_codecs_defs.h index 6b35c2b23b..e632c39d7d 100644 --- a/ydb/core/persqueue/type_codecs_defs.h +++ b/ydb/core/persqueue/type_codecs_defs.h @@ -1,8 +1,8 @@ #pragma once #include <ydb/core/scheme/scheme_type_id.h> -#include <ydb/core/util/blob_data_stream.h> +#include <util/generic/buffer.h> #include <util/generic/hash.h> #include <util/generic/ptr.h> #include <util/generic/singleton.h> @@ -182,7 +182,7 @@ public: virtual TCodecSig Signature() const = 0; - virtual TAutoPtr<IChunkCoder> MakeChunk(TFlatBlobDataOutputStream*) const = 0; + virtual TAutoPtr<IChunkCoder> MakeChunk(TBuffer&) const = 0; virtual IChunkDecoder::TPtr ReadChunk(const TDataRef&) const = 0; /// Read the chunk using 'this' codec (if the codec signature matches), @@ -191,7 +191,7 @@ public: }; /***************************************************************************//** - * TDataRef can either share the data (TString) or keep a reference (TStringBuf). + * TDataRef can either share the data (TString, TBuffer) or keep a reference (TStringBuf). * It uses short string optimization (SSO) to store small data (<= 16b). * TODO: Move to ydb/core/util ******************************************************************************/ @@ -217,7 +217,12 @@ public: : TDataRef(data.data(), data.size()) { } - /// Copy and take ownership of a small piece of data (<= 16b). + TDataRef(const TBuffer& data) + : TDataRef(data.data(), data.size()) + { + } + + /// Copy and take ownership of a small piece of data (<= 16b). TDataRef(const char* data, size_t size, bool) : ShortSize_(size) , IsNull_(0) diff --git a/ydb/core/persqueue/type_coders.h b/ydb/core/persqueue/type_coders.h index 26c2566d32..d8e91ce030 100644 --- a/ydb/core/persqueue/type_coders.h +++ b/ydb/core/persqueue/type_coders.h @@ -7,10 +7,17 @@ #include <library/cpp/packedtypes/zigzag.h> #include <util/generic/typetraits.h> +#include <util/generic/buffer.h> namespace NKikimr { namespace NScheme { + +template <typename T> +void AppendPOD(TBuffer& output, const T& dt) { + output.Append((const char*)&dt, (ui32)sizeof(T)); +} + //////////////////////////////////////////////////////////////////////////////// template <ui16 Type, bool IsNullable> @@ -39,10 +46,10 @@ public: inline void AddNull() { Mask.Append(0, 1); } inline void AddNonNull() { Mask.Append(1, 1); } - inline void Seal(TFlatBlobDataOutputStream* output) { + inline void Seal(TBuffer& output) { const ui32 maskSize = Mask.Words() * sizeof(TMask::TWord); - output->Write((const char*)Mask.Data(), maskSize); - output->Write((const char*)&maskSize, sizeof(maskSize)); // TODO: Reduce size of small masks (embed size into the first word). + output.Append((const char*)Mask.Data(), maskSize); + output.Append((const char*)&maskSize, sizeof(maskSize)); // TODO: Reduce size of small masks (embed size into the first word). } private: @@ -56,7 +63,7 @@ public: inline size_t MaxSize() const { return 0; } inline void AddNull() { Y_FAIL("Null values are not supported."); } inline void AddNonNull() { } - inline void Seal(TFlatBlobDataOutputStream*) { } + inline void Seal(TBuffer&) { } }; //////////////////////////////////////////////////////////////////////////////// @@ -64,11 +71,11 @@ public: template <size_t Size, bool IsNullable> class TFixedLenCoder : public TChunkCoderBase<ui16(TCodecType::FixedLen), IsNullable> { public: - TFixedLenCoder(TFlatBlobDataOutputStream* output) + TFixedLenCoder(TBuffer& output) : DataSize(0) , Output(output) { - output->WritePOD(TFixedLenCoder::Sig()); + AppendPOD(output, TFixedLenCoder::Sig()); } size_t GetEstimatedSize() const override { @@ -84,20 +91,20 @@ protected: Y_VERIFY(size == Size, "Size mismatch."); Mask.AddNonNull(); DataSize += Size; - Output->Write(data, size); + Output.Append(data, size); } void DoAddNull() override { Mask.AddNull(); static char zero[Size ? Size : 1]; - Output->Write(zero, Size); + Output.Append(zero, Size); // TODO: Consider using succinct Rank structure instead. } private: size_t DataSize; TCoderMask<IsNullable> Mask; - TFlatBlobDataOutputStream* Output; + TBuffer& Output; }; //////////////////////////////////////////////////////////////////////////////// @@ -105,11 +112,11 @@ private: template <bool IsNullable> class TVarLenCoder : public TChunkCoderBase<ui16(TCodecType::VarLen), IsNullable> { public: - TVarLenCoder(TFlatBlobDataOutputStream* output) + TVarLenCoder(TBuffer& output) : DataSize(0) , Output(output) { - output->WritePOD(TVarLenCoder::Sig()); + AppendPOD(output, TVarLenCoder::Sig()); } size_t GetEstimatedSize() const override { @@ -117,7 +124,7 @@ public: } void Seal() override { - Output->Write((const char*)Offsets.data(), Offsets.size() * sizeof(ui32)); + Output.Append((const char*)Offsets.data(), Offsets.size() * sizeof(ui32)); Mask.Seal(Output); } @@ -126,7 +133,7 @@ protected: Mask.AddNonNull(); DataSize += size; Offsets.push_back(DataSize); - Output->Write(data, size); + Output.Append(data, size); } void DoAddNull() override { @@ -138,7 +145,7 @@ private: ui32 DataSize; TVector<ui32> Offsets; TCoderMask<IsNullable> Mask; - TFlatBlobDataOutputStream* Output; + TBuffer& Output; }; //////////////////////////////////////////////////////////////////////////////// @@ -149,11 +156,11 @@ public: using TType = TIntType; using TSigned = std::make_signed_t<TType>; - inline size_t Save(TFlatBlobDataOutputStream* output, TType value) { + inline size_t Save(TBuffer& output, TType value) { const auto outValue = static_cast<i64>(value); // TODO: fix out_long(i32) char varIntOut[sizeof(outValue) + 1]; auto bytes = out_long(outValue, varIntOut); - output->Write(varIntOut, bytes); + output.Append(varIntOut, bytes); return bytes; } }; @@ -164,11 +171,11 @@ public: using TType = TIntType; using TSigned = std::make_signed_t<TType>; - inline size_t Save(TFlatBlobDataOutputStream* output, TType value) { + inline size_t Save(TBuffer& output, TType value) { const auto zigZagged = static_cast<i64>(ZigZagEncode(value)); char varIntOut[sizeof(zigZagged) + 1]; auto bytes = out_long(zigZagged, varIntOut); - output->Write(varIntOut, bytes); + output.Append(varIntOut, bytes); return bytes; } }; @@ -178,7 +185,7 @@ class TDeltaValueCoder : public TValueCoder { public: using TType = typename TValueCoder::TType; - inline size_t Save(TFlatBlobDataOutputStream* output, TType value) { + inline size_t Save(TBuffer& output, TType value) { auto delta = Rev ? Last - value : value - Last; Last = value; return TValueCoder::Save(output, delta); @@ -193,11 +200,11 @@ class TByteAlignedIntCoder : public TChunkCoderBase<CodecType, IsNullable> { public: using TType = typename TValueCoder::TType; - TByteAlignedIntCoder(TFlatBlobDataOutputStream* output) + TByteAlignedIntCoder(TBuffer& output) : DataSize(0) , Output(output) { - output->WritePOD(TByteAlignedIntCoder::Sig()); + AppendPOD(output, TByteAlignedIntCoder::Sig()); } size_t GetEstimatedSize() const override { @@ -222,14 +229,14 @@ protected: private: size_t DataSize; TCoderMask<IsNullable> Mask; - TFlatBlobDataOutputStream* Output; + TBuffer& Output; TValueCoder ValueCoder; }; template <typename TIntType, bool IsNullable> class TVarIntCoder : public TByteAlignedIntCoder<TVarIntValueCoder<TIntType>, ui16(TCodecType::VarInt), IsNullable> { public: - TVarIntCoder(TFlatBlobDataOutputStream* output) + TVarIntCoder(TBuffer& output) : TByteAlignedIntCoder<TVarIntValueCoder<TIntType>, ui16(TCodecType::VarInt), IsNullable>(output) { } }; @@ -237,7 +244,7 @@ public: template <typename TIntType, bool IsNullable> class TZigZagCoder : public TByteAlignedIntCoder<TZigZagValueCoder<TIntType>, ui16(TCodecType::ZigZag), IsNullable> { public: - TZigZagCoder(TFlatBlobDataOutputStream* output) + TZigZagCoder(TBuffer& output) : TByteAlignedIntCoder<TZigZagValueCoder<TIntType>, ui16(TCodecType::ZigZag), IsNullable>(output) { } }; @@ -245,7 +252,7 @@ public: template <typename TIntType, bool IsNullable> class TDeltaVarIntCoder : public TByteAlignedIntCoder<TDeltaValueCoder<TVarIntValueCoder<TIntType>>, ui16(TCodecType::DeltaVarInt), IsNullable> { public: - TDeltaVarIntCoder(TFlatBlobDataOutputStream* output) + TDeltaVarIntCoder(TBuffer& output) : TByteAlignedIntCoder<TDeltaValueCoder<TVarIntValueCoder<TIntType>>, ui16(TCodecType::DeltaVarInt), IsNullable>(output) { } }; @@ -253,7 +260,7 @@ public: template <typename TIntType, bool IsNullable> class TDeltaRevVarIntCoder : public TByteAlignedIntCoder<TDeltaValueCoder<TVarIntValueCoder<TIntType>, true>, ui16(TCodecType::DeltaRevVarInt), IsNullable> { public: - TDeltaRevVarIntCoder(TFlatBlobDataOutputStream* output) + TDeltaRevVarIntCoder(TBuffer& output) : TByteAlignedIntCoder<TDeltaValueCoder<TVarIntValueCoder<TIntType>, true>, ui16(TCodecType::DeltaRevVarInt), IsNullable>(output) { } }; @@ -261,7 +268,7 @@ public: template <typename TIntType, bool IsNullable> class TDeltaZigZagCoder : public TByteAlignedIntCoder<TDeltaValueCoder<TZigZagValueCoder<TIntType>>, ui16(TCodecType::DeltaZigZag), IsNullable> { public: - TDeltaZigZagCoder(TFlatBlobDataOutputStream* output) + TDeltaZigZagCoder(TBuffer& output) : TByteAlignedIntCoder<TDeltaValueCoder<TZigZagValueCoder<TIntType>>, ui16(TCodecType::DeltaZigZag), IsNullable>(output) { } }; @@ -271,10 +278,10 @@ public: template <bool IsNullable> class TBoolCoder : public TChunkCoderBase<ui16(TCodecType::Bool), IsNullable> { public: - TBoolCoder(TFlatBlobDataOutputStream* output) + TBoolCoder(TBuffer& output) : Output(output) { - output->WritePOD(TBoolCoder::Sig()); + AppendPOD(output, TBoolCoder::Sig()); } size_t GetEstimatedSize() const override { @@ -282,7 +289,7 @@ public: } void Seal() override { - Output->Write((const char*)Mask.Data(), Mask.Words() * sizeof(TMask::TWord)); + Output.Append((const char*)Mask.Data(), Mask.Words() * sizeof(TMask::TWord)); } protected: @@ -301,7 +308,7 @@ protected: protected: using TMask = TBitVector<ui16>; TMask Mask; - TFlatBlobDataOutputStream* Output; + TBuffer& Output; }; //////////////////////////////////////////////////////////////////////////////// diff --git a/ydb/core/persqueue/ut/internals_ut.cpp b/ydb/core/persqueue/ut/internals_ut.cpp index 7f953e042d..f366530ad4 100644 --- a/ydb/core/persqueue/ut/internals_ut.cpp +++ b/ydb/core/persqueue/ut/internals_ut.cpp @@ -182,11 +182,11 @@ Y_UNIT_TEST(TestBatchPacking) { )); } batch.Pack(); - TString s = batch.PackedData; + TBuffer b = batch.PackedData; UNIT_ASSERT(batch.Header.GetFormat() == NKikimrPQ::TBatchHeader::ECompressed); batch.Unpack(); batch.Pack(); - UNIT_ASSERT(batch.PackedData == s); + UNIT_ASSERT(batch.PackedData == b); TString str; batch.SerializeTo(str); auto header = ExtractHeader(str.c_str(), str.size()); diff --git a/ydb/core/persqueue/ut/type_codecs_ut.cpp b/ydb/core/persqueue/ut/type_codecs_ut.cpp index af0e1e1f53..09f0bb7563 100644 --- a/ydb/core/persqueue/ut/type_codecs_ut.cpp +++ b/ydb/core/persqueue/ut/type_codecs_ut.cpp @@ -19,8 +19,8 @@ using TDataRef = NScheme::TDataRef; Y_UNIT_TEST_SUITE(TTypeCodecsTest) { void Metrics(const TVector<TDataRef>& values, const ICodec* codec) { - TAutoPtr<TFlatBlobDataOutputStream> output(new TFlatBlobDataOutputStream()); - auto chunk = codec->MakeChunk(output.Get()); + TBuffer output; + auto chunk = codec->MakeChunk(output); auto start = TInstant::Now(); for (const auto& value : values) { @@ -32,11 +32,10 @@ Y_UNIT_TEST_SUITE(TTypeCodecsTest) { chunk->Seal(); auto duration = TInstant::Now() - start; - Cerr << "Size: " << output->GetCurrentOffset() << Endl; + Cerr << "Size: " << output.Size() << Endl; Cerr << "Create chunk: " << duration << Endl; - auto reading = codec->ReadChunk(output->GetBuffer()); - output.Reset(nullptr); + auto reading = codec->ReadChunk(output); start = TInstant::Now(); for (size_t i = 0, size = values.size(); i != size; ++i) { @@ -55,8 +54,8 @@ Y_UNIT_TEST_SUITE(TTypeCodecsTest) { } void TestImpl(const TVector<TDataRef>& values, const ICodec* codec) { - TAutoPtr<TFlatBlobDataOutputStream> output(new TFlatBlobDataOutputStream()); - auto chunk = codec->MakeChunk(output.Get()); + TBuffer output; + auto chunk = codec->MakeChunk(output); for (const auto& value : values) { if (value.IsNull()) chunk->AddNull(); @@ -65,9 +64,8 @@ Y_UNIT_TEST_SUITE(TTypeCodecsTest) { } chunk->Seal(); - auto reading = codec->ReadChunk(output->GetBuffer()); + auto reading = codec->ReadChunk(output); auto iter = reading->MakeIterator(); - output.Reset(nullptr); for (size_t i = 0; i != values.size(); ++i) { const auto& value = values[i]; diff --git a/ydb/core/util/blob_data_stream.h b/ydb/core/util/blob_data_stream.h deleted file mode 100644 index dac5c0234b..0000000000 --- a/ydb/core/util/blob_data_stream.h +++ /dev/null @@ -1,205 +0,0 @@ -#pragma once - -#include "defs.h" - -#include <util/generic/buffer.h> - -//////////////////////////////////////////// -namespace NKikimr { - -//////////////////////////////////////////// -class TFlatBlobDataInputStream { -public: - // - TFlatBlobDataInputStream() {} - TFlatBlobDataInputStream(const TStringBuf& buffer) - : Buffer(buffer) - {} - ~TFlatBlobDataInputStream() {} - - // - const void* ReadAt(ui32 offset, ui32 size) const { - Y_VERIFY_DEBUG(Buffer.size() >= offset + size); - Y_UNUSED(size); - - return Buffer.data() + offset; - } - - // - template<typename T> - const T* ReadAt(ui32 offset) const { - return (const T*)ReadAt(offset, sizeof(T)); - } - -private: - // - TStringBuf Buffer; -}; - -//////////////////////////////////////////// -class TFlatBlobDataOutputStream { -public: - // - TFlatBlobDataOutputStream() - {} - TFlatBlobDataOutputStream(TString bufferToGrab) - : Buffer(bufferToGrab) - {} - ~TFlatBlobDataOutputStream() {} - - // IBlobDataStream intarface - ui32 GetCurrentOffset() const { - return Buffer.size(); - } - - template<typename T> - void WritePOD(const T& dt) { - Write((const char*)&dt, (ui32)sizeof(T)); - } - - void Write(const char* data, ui32 size) { - Buffer.AppendNoAlias(data, size); - } - - void WriteAtPosition(ui32 position, const char* data, ui32 size) { - memcpy((char*)Buffer.data() + position, data, size); - } - - void Write(const TFlatBlobDataOutputStream* anotherStream) { - TStringBuf buf = anotherStream->CurrentBuffer(); - Write(buf.data(), buf.size()); - } - - TStringBuf CurrentBuffer() const { - return TStringBuf(Buffer.data(), Buffer.size()); - } - - const TString& GetBuffer() const { - return Buffer; - } - - //////////////////////////////////////////// - class TFutureValueBase { - public: - // - TFutureValueBase(TFlatBlobDataOutputStream* dataStream, ui32 position, ui32 size) - : DataStream(dataStream) - , Position(position) - , Size(size) - {} - ~TFutureValueBase() {} - - // - bool IsValid() const { return DataStream != nullptr; } - explicit operator bool() const { return IsValid(); } - bool operator !() const { return !IsValid(); } - - protected: - // - TFlatBlobDataOutputStream* DataStream; - ui32 Position; - ui32 Size; - }; - - - //////////////////////////////////////////// - class TFutureValue : public TFutureValueBase { - public: - // - TFutureValue(TFlatBlobDataOutputStream* dataStream = nullptr, ui32 position = 0, ui32 size = 0) - : TFutureValueBase(dataStream, position, size) - {} - ~TFutureValue() {} - - // - template<typename T> - void SetValue(const T& value) { - Y_VERIFY(sizeof(T) == Size); - DataStream->WriteAtPosition(Position, (const char*)&value, sizeof(T)); - } - }; - - //////////////////////////////////////////// - template<typename T> - class TFutureValuePOD : public TFutureValueBase { - public: - // - TFutureValuePOD(TFlatBlobDataOutputStream* dataStream = nullptr, ui32 position = 0, ui32 size = 0) - : TFutureValueBase(dataStream, position, size) - {} - ~TFutureValuePOD() {} - - // - void SetValue(const T& value) { - Y_VERIFY(sizeof(T) == Size); - DataStream->WriteAtPosition(Position, (const char*)&value, sizeof(T)); - } - }; - - // - TFutureValue FutureWrite(const char* data, ui32 size) { - ui32 currentOffset = GetCurrentOffset(); - Write(data, size); - return TFutureValue(this, currentOffset, size); - } - - template <typename T> - TFutureValuePOD<T> FutureWritePOD(const T& dt) { - ui32 currentOffset = GetCurrentOffset(); - WritePOD<T>(dt); - return TFutureValuePOD<T>(this, currentOffset, (ui32)sizeof(T)); - } - -private: - // - TString Buffer; -}; - -//////////////////////////////////////////// -class TBufferReader { -public: - // - TBufferReader(const TStringBuf& buffer) - : Buffer(buffer) - , Offset(0) - {} - ~TBufferReader() {} - - void SetOffset(ui32 newOffset) { - Offset = newOffset; - Y_VERIFY_DEBUG(Offset <= Buffer.size()); - } - void AddOffset(ui32 incOffset) { - Offset += incOffset; - Y_VERIFY_DEBUG(Offset <= Buffer.size()); - } - - // - template<typename T> - const T* ReadData() { - Y_VERIFY_DEBUG(Offset + sizeof(T) <= Buffer.size()); - T* retVal = (T*) (Buffer.data() + Offset); - Offset += sizeof(T); - return retVal; - } - - TStringBuf ReadData(ui32 len) { - Y_VERIFY_DEBUG(Offset + len <= Buffer.size()); - TStringBuf retVal(Buffer.data() + Offset, len); - Offset += len; - return retVal; - } - - TStringBuf GetBuffer() const { - Y_VERIFY_DEBUG(Buffer.size() > Offset); - return TStringBuf(Buffer.data() + Offset, Buffer.size() - Offset); - } - -private: - // - TStringBuf Buffer; - ui32 Offset; -}; - -} // end of the NKikimr namespace - |