aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorazevaykin <azevaykin@yandex-team.com>2023-05-30 10:57:38 +0300
committerazevaykin <azevaykin@yandex-team.com>2023-05-30 10:57:38 +0300
commit3dab27170ff8ed8b6ce93179da5f767ee3586787 (patch)
treed1000ed9f460f4180fd161a9a5c027ba0d070c7c
parent18887976a96b91c933fbd89663fa916d9105e7e3 (diff)
downloadydb-3dab27170ff8ed8b6ce93179da5f767ee3586787.tar.gz
Remove temporary strings from TBatch::Pack
Убрал в TBatch::Pack в двух местах промежуточные буфера: https://a.yandex-team.ru/review/3930290/files/456ffebaded149bc22c6d146b5c58ea66b9e0b0f#file-ydb/core/persqueue/blob.cpp:L220 https://a.yandex-team.ru/review/3930290/files/89d42b1e1e0526c2920cd9ca91b39aee4c2f1490#file-ydb/core/persqueue/blob.cpp:L258 Было 7 млрд samples: ![](https://arcanum.s3.mds.yandex.net/files/azevaykin/0aInkQq_YtMbkHfiP3KDw) Стало 3 млрд samples: ![](https://arcanum.s3.mds.yandex.net/files/azevaykin/fbgba8t5eyKxkqGuPKPqX)
-rw-r--r--ydb/core/persqueue/blob.cpp118
-rw-r--r--ydb/core/persqueue/blob.h12
-rw-r--r--ydb/core/persqueue/pq_l2_cache.h2
-rw-r--r--ydb/core/persqueue/type_codecs.h2
-rw-r--r--ydb/core/persqueue/type_codecs_defs.h13
-rw-r--r--ydb/core/persqueue/type_coders.h69
-rw-r--r--ydb/core/persqueue/ut/internals_ut.cpp4
-rw-r--r--ydb/core/persqueue/ut/type_codecs_ut.cpp16
-rw-r--r--ydb/core/util/blob_data_stream.h205
9 files changed, 141 insertions, 300 deletions
diff --git a/ydb/core/persqueue/blob.cpp b/ydb/core/persqueue/blob.cpp
index 60f2f96708..f925152e19 100644
--- a/ydb/core/persqueue/blob.cpp
+++ b/ydb/core/persqueue/blob.cpp
@@ -185,7 +185,7 @@ TClientBlob TClientBlob::Deserialize(const char* data, ui32 size)
return TClientBlob(sourceId, seqNo, dt, std::move(partData), writeTimestamp, createTimestamp, us, partitionKey, explicitHashKey);
}
-void TBatch::SerializeTo(TString& res) {
+void TBatch::SerializeTo(TString& res) const{
Y_VERIFY(Packed);
ui16 sz = Header.ByteSize();
@@ -194,30 +194,43 @@ void TBatch::SerializeTo(TString& res) {
bool rs = Header.AppendToString(&res);
Y_VERIFY(rs);
- res += PackedData;
+ res.append(PackedData.data(), PackedData.size());
}
template <typename TCodec>
-TAutoPtr<NScheme::IChunkCoder> MakeChunk(TAutoPtr<TFlatBlobDataOutputStream>& output)
+TAutoPtr<NScheme::IChunkCoder> MakeChunk(TBuffer& output)
{
- output.Reset(new TFlatBlobDataOutputStream);
TCodec codec;
- return codec.MakeChunk(output.Get());
+ return codec.MakeChunk(output);
}
-void OutputChunk(TAutoPtr<NScheme::IChunkCoder> chunk, TAutoPtr<TFlatBlobDataOutputStream> output, TBuffer& res)
+const ui32 CHUNK_SIZE_PLACEMENT = 0xCCCCCCCC;
+ui32 WriteTemporaryChunkSize(TBuffer & output)
{
- chunk->Seal();
- ui32 size = output->CurrentBuffer().size();
- res.Append((const char*)&size, sizeof(ui32));
- res.Append(output->CurrentBuffer().data(), output->CurrentBuffer().size());
+ ui32 sizeOffset = output.Size();
+
+ ui32 sizePlacement = CHUNK_SIZE_PLACEMENT;
+ output.Append((const char*)&sizePlacement, sizeof(ui32));
+
+ return sizeOffset;
+}
+
+void WriteActualChunkSize(TBuffer& output, ui32 sizeOffset)
+{
+ ui32 currSize = output.size();
+ Y_VERIFY(currSize >= sizeOffset + sizeof(ui32));
+ ui32 size = currSize - sizeOffset - sizeof(ui32);
+
+ ui32* sizePlacement = (ui32*)(output.data() + sizeOffset);
+ Y_VERIFY_DEBUG(*sizePlacement == CHUNK_SIZE_PLACEMENT);
+ *sizePlacement = size;
}
void TBatch::Pack() {
if (Packed)
return;
Packed = true;
- TBuffer res;
+ PackedData.Clear();
bool hasUncompressed = false;
bool hasKinesis = false;
@@ -255,11 +268,11 @@ void TBatch::Pack() {
for (ui32 i = 0; i < Blobs.size(); ++i) {
pos[start[reorderMap[TStringBuf(Blobs[i].SourceId)]]++] = i;
}
- TAutoPtr<TFlatBlobDataOutputStream> output;
//output order
{
- auto chunk = MakeChunk<NScheme::TVarIntCodec<ui32,false>>(output);
+ ui32 sizeOffset = WriteTemporaryChunkSize(PackedData);
+ auto chunk = MakeChunk<NScheme::TVarIntCodec<ui32, false>>(PackedData);
for (const auto& p : pos) {
chunk->AddData((const char*)&p, sizeof(p));
}
@@ -268,39 +281,47 @@ void TBatch::Pack() {
for (const auto& p : start) {
chunk->AddData((const char*)&p, sizeof(p));
}
- OutputChunk(chunk, output, res);
+ chunk->Seal();
+ WriteActualChunkSize(PackedData, sizeOffset);
}
//output SourceId
{
- auto chunk = MakeChunk<NScheme::TVarLenCodec<false>>(output);
+ ui32 sizeOffset = WriteTemporaryChunkSize(PackedData);
+ auto chunk = MakeChunk<NScheme::TVarLenCodec<false>>(PackedData);
for (auto it = reorderMap.begin(); it != reorderMap.end(); ++it) {
chunk->AddData(it->first.data(), it->first.size());
}
- OutputChunk(chunk, output, res);
+ chunk->Seal();
+ WriteActualChunkSize(PackedData, sizeOffset);
}
//output SeqNo
{
- auto chunk = MakeChunk<NScheme::TDeltaVarIntCodec<ui64, false>>(output);
+ ui32 sizeOffset = WriteTemporaryChunkSize(PackedData);
+ auto chunk = MakeChunk<NScheme::TDeltaVarIntCodec<ui64, false>>(PackedData);
for (const auto& p : pos) {
chunk->AddData((const char*)&Blobs[p].SeqNo, sizeof(ui64));
}
- OutputChunk(chunk, output, res);
+ chunk->Seal();
+ WriteActualChunkSize(PackedData, sizeOffset);
}
//output Data
{
- auto chunk = MakeChunk<NScheme::TVarLenCodec<false>>(output);
+ ui32 sizeOffset = WriteTemporaryChunkSize(PackedData);
+ auto chunk = MakeChunk<NScheme::TVarLenCodec<false>>(PackedData);
for (const auto& p : pos) {
chunk->AddData(Blobs[p].Data.data(), Blobs[p].Data.size());
}
- OutputChunk(chunk, output, res);
+ chunk->Seal();
+ WriteActualChunkSize(PackedData, sizeOffset);
}
//output PartData::Pos + payload
{
- auto chunk = MakeChunk<NScheme::TVarIntCodec<ui32, false>>(output);
+ ui32 sizeOffset = WriteTemporaryChunkSize(PackedData);
+ auto chunk = MakeChunk<NScheme::TVarIntCodec<ui32, false>>(PackedData);
ui32 cnt = 0;
for (ui32 i = 0; i < Blobs.size(); ++i) {
if (Blobs[i].PartData)
@@ -317,68 +338,79 @@ void TBatch::Pack() {
chunk->AddData((const char*)&Blobs[i].PartData->TotalSize, sizeof(ui32));
}
}
- OutputChunk(chunk, output, res);
+ chunk->Seal();
+ WriteActualChunkSize(PackedData, sizeOffset);
}
//output Wtime
{
- auto chunk = MakeChunk<NScheme::TDeltaVarIntCodec<ui64, false>>(output);
+ ui32 sizeOffset = WriteTemporaryChunkSize(PackedData);
+ auto chunk = MakeChunk<NScheme::TDeltaVarIntCodec<ui64, false>>(PackedData);
for (ui32 i = 0; i < Blobs.size(); ++i) {
ui64 writeTimestampMs = Blobs[i].WriteTimestamp.MilliSeconds();
chunk->AddData((const char*)&writeTimestampMs, sizeof(ui64));
}
- OutputChunk(chunk, output, res);
+ chunk->Seal();
+ WriteActualChunkSize(PackedData, sizeOffset);
}
if (hasKinesis) {
{
- auto chunk = MakeChunk<NScheme::TVarLenCodec<false>>(output);
+ ui32 sizeOffset = WriteTemporaryChunkSize(PackedData);
+ auto chunk = MakeChunk<NScheme::TVarLenCodec<false>>(PackedData);
for (const auto &p : pos) {
chunk->AddData(Blobs[p].PartitionKey.data(), Blobs[p].PartitionKey.size());
}
- OutputChunk(chunk, output, res);
+ chunk->Seal();
+ WriteActualChunkSize(PackedData, sizeOffset);
}
{
- auto chunk = MakeChunk<NScheme::TVarLenCodec<false>>(output);
+ ui32 sizeOffset = WriteTemporaryChunkSize(PackedData);
+ auto chunk = MakeChunk<NScheme::TVarLenCodec<false>>(PackedData);
for (const auto &p : pos) {
chunk->AddData(Blobs[p].ExplicitHashKey.data(), Blobs[p].ExplicitHashKey.size());
}
- OutputChunk(chunk, output, res);
+ chunk->Seal();
+ WriteActualChunkSize(PackedData, sizeOffset);
}
}
//output Ctime
{
- auto chunk = MakeChunk<NScheme::TDeltaVarIntCodec<ui64, false>>(output);
+ ui32 sizeOffset = WriteTemporaryChunkSize(PackedData);
+ auto chunk = MakeChunk<NScheme::TDeltaVarIntCodec<ui64, false>>(PackedData);
for (ui32 i = 0; i < Blobs.size(); ++i) {
ui64 createTimestampMs = Blobs[i].CreateTimestamp.MilliSeconds();
chunk->AddData((const char*)&createTimestampMs, sizeof(ui64));
}
- OutputChunk(chunk, output, res);
+ chunk->Seal();
+ WriteActualChunkSize(PackedData, sizeOffset);
}
//output Uncompressed
if (hasUncompressed) {
- auto chunk = MakeChunk<NScheme::TVarIntCodec<ui32, false>>(output);
+ ui32 sizeOffset = WriteTemporaryChunkSize(PackedData);
+ auto chunk = MakeChunk<NScheme::TVarIntCodec<ui32, false>>(PackedData);
for (ui32 i = 0; i < Blobs.size(); ++i) {
chunk->AddData((const char*)&Blobs[i].UncompressedSize, sizeof(ui32));
}
- OutputChunk(chunk, output, res);
+ chunk->Seal();
+ WriteActualChunkSize(PackedData, sizeOffset);
}
- PackedData = TString{res.Data(), res.Size()};
Header.SetPayloadSize(PackedData.size());
if (GetPackedSize() > GetUnpackedSize() + GetMaxHeaderSize()) { //packing is not effective, write as-is
Header.SetFormat(NKikimrPQ::TBatchHeader::EUncompressed);
- res.Clear();
+ PackedData.Clear();
for (ui32 i = 0; i < Blobs.size(); ++i) {
- Blobs[i].SerializeTo(res);
+ Blobs[i].SerializeTo(PackedData);
}
- PackedData = TString{res.Data(), res.Size()};
Header.SetPayloadSize(PackedData.size());
}
+
+
TVector<TClientBlob> tmp;
Blobs.swap(tmp);
InternalPartsPos.resize(0);
@@ -397,13 +429,13 @@ void TBatch::Unpack() {
InternalPartsPos.push_back(i);
}
Y_VERIFY(InternalPartsPos.size() == GetInternalPartsCount());
- TString tmp;
- tmp.swap(PackedData);
+
+ PackedData.Clear();
}
void TBatch::UnpackTo(TVector<TClientBlob> *blobs)
{
- Y_VERIFY(!PackedData.empty());
+ Y_VERIFY(PackedData.size());
auto type = Header.GetFormat();
switch (type) {
case NKikimrPQ::TBatchHeader::EUncompressed:
@@ -427,7 +459,7 @@ NScheme::TDataRef GetChunk(const char*& data, const char *end)
void TBatch::UnpackToType1(TVector<TClientBlob> *blobs) {
Y_VERIFY(Header.GetFormat() == NKikimrPQ::TBatchHeader::ECompressed);
- Y_VERIFY(!PackedData.empty());
+ Y_VERIFY(PackedData.size());
ui32 totalBlobs = Header.GetCount() + Header.GetInternalPartsCount();
ui32 partsSize = 0;
TVector<ui32> end;
@@ -587,13 +619,13 @@ void TBatch::UnpackToType1(TVector<TClientBlob> *blobs) {
void TBatch::UnpackToType0(TVector<TClientBlob> *blobs) {
Y_VERIFY(Header.GetFormat() == NKikimrPQ::TBatchHeader::EUncompressed);
- Y_VERIFY(!PackedData.empty());
+ Y_VERIFY(PackedData.size());
ui32 shift = 0;
for (ui32 i = 0; i < GetCount() + GetInternalPartsCount(); ++i) {
Y_VERIFY(shift < PackedData.size());
- blobs->push_back(TClientBlob::Deserialize(PackedData.c_str() + shift, PackedData.size() - shift));
- shift += *(ui32*)(PackedData.c_str() + shift);
+ blobs->push_back(TClientBlob::Deserialize(PackedData.data() + shift, PackedData.size() - shift));
+ shift += *(ui32*)(PackedData.data() + shift);
}
Y_VERIFY(shift == PackedData.size());
}
diff --git a/ydb/core/persqueue/blob.h b/ydb/core/persqueue/blob.h
index 6573ee4720..307cca7d0e 100644
--- a/ydb/core/persqueue/blob.h
+++ b/ydb/core/persqueue/blob.h
@@ -3,6 +3,7 @@
#include "key.h"
#include <util/datetime/base.h>
+#include <util/generic/size_literals.h>
#include <util/generic/maybe.h>
#include <util/generic/vector.h>
@@ -97,6 +98,7 @@ struct TClientBlob {
};
+static constexpr const ui32 MAX_BLOB_SIZE = 8_MB;
//TBatch represents several clientBlobs. Can be in unpacked state(TVector<TClientBlob> blobs)
//or packed(PackedData)
@@ -112,14 +114,17 @@ struct TBatch {
TVector<TClientBlob> Blobs;
TVector<ui32> InternalPartsPos;
NKikimrPQ::TBatchHeader Header;
- TString PackedData;
+ TBuffer PackedData;
TBatch()
: Packed(false)
- {}
+ {
+ PackedData.Reserve(8_MB);
+ }
TBatch(const ui64 offset, const ui16 partNo, const TVector<TClientBlob>& blobs)
: Packed(false)
{
+ PackedData.Reserve(8_MB);
Header.SetOffset(offset);
Header.SetPartNo(partNo);
Header.SetUnpackedSize(0);
@@ -133,6 +138,7 @@ struct TBatch {
TBatch(const ui64 offset, const ui16 partNo, const std::deque<TClientBlob>& blobs)
: Packed(false)
{
+ PackedData.Reserve(8_MB);
Header.SetOffset(offset);
Header.SetPartNo(partNo);
Header.SetUnpackedSize(0);
@@ -189,7 +195,7 @@ struct TBatch {
void UnpackToType0(TVector<TClientBlob> *result);
void UnpackToType1(TVector<TClientBlob> *result);
- void SerializeTo(TString& res);
+ void SerializeTo(TString& res) const;
ui32 FindPos(const ui64 offset, const ui16 partNo) const;
diff --git a/ydb/core/persqueue/pq_l2_cache.h b/ydb/core/persqueue/pq_l2_cache.h
index b76a2017d0..05f71baaa0 100644
--- a/ydb/core/persqueue/pq_l2_cache.h
+++ b/ydb/core/persqueue/pq_l2_cache.h
@@ -12,8 +12,6 @@
namespace NKikimr {
namespace NPQ {
-static const ui32 MAX_BLOB_SIZE = 8_MB;
-
struct TL2Counters {
::NMonitoring::TDynamicCounters::TCounterPtr TotalSize;
::NMonitoring::TDynamicCounters::TCounterPtr TotalCount;
diff --git a/ydb/core/persqueue/type_codecs.h b/ydb/core/persqueue/type_codecs.h
index 788e0fce0e..9450c8409a 100644
--- a/ydb/core/persqueue/type_codecs.h
+++ b/ydb/core/persqueue/type_codecs.h
@@ -25,7 +25,7 @@ public:
return TCoder::Sig();
}
- TAutoPtr<IChunkCoder> MakeChunk(TFlatBlobDataOutputStream* output) const override {
+ TAutoPtr<IChunkCoder> MakeChunk(TBuffer& output) const override {
return new TCoder(output);
}
diff --git a/ydb/core/persqueue/type_codecs_defs.h b/ydb/core/persqueue/type_codecs_defs.h
index 6b35c2b23b..e632c39d7d 100644
--- a/ydb/core/persqueue/type_codecs_defs.h
+++ b/ydb/core/persqueue/type_codecs_defs.h
@@ -1,8 +1,8 @@
#pragma once
#include <ydb/core/scheme/scheme_type_id.h>
-#include <ydb/core/util/blob_data_stream.h>
+#include <util/generic/buffer.h>
#include <util/generic/hash.h>
#include <util/generic/ptr.h>
#include <util/generic/singleton.h>
@@ -182,7 +182,7 @@ public:
virtual TCodecSig Signature() const = 0;
- virtual TAutoPtr<IChunkCoder> MakeChunk(TFlatBlobDataOutputStream*) const = 0;
+ virtual TAutoPtr<IChunkCoder> MakeChunk(TBuffer&) const = 0;
virtual IChunkDecoder::TPtr ReadChunk(const TDataRef&) const = 0;
/// Read the chunk using 'this' codec (if the codec signature matches),
@@ -191,7 +191,7 @@ public:
};
/***************************************************************************//**
- * TDataRef can either share the data (TString) or keep a reference (TStringBuf).
+ * TDataRef can either share the data (TString, TBuffer) or keep a reference (TStringBuf).
* It uses short string optimization (SSO) to store small data (<= 16b).
* TODO: Move to ydb/core/util
******************************************************************************/
@@ -217,7 +217,12 @@ public:
: TDataRef(data.data(), data.size())
{ }
- /// Copy and take ownership of a small piece of data (<= 16b).
+ TDataRef(const TBuffer& data)
+ : TDataRef(data.data(), data.size())
+ {
+ }
+
+ /// Copy and take ownership of a small piece of data (<= 16b).
TDataRef(const char* data, size_t size, bool)
: ShortSize_(size)
, IsNull_(0)
diff --git a/ydb/core/persqueue/type_coders.h b/ydb/core/persqueue/type_coders.h
index 26c2566d32..d8e91ce030 100644
--- a/ydb/core/persqueue/type_coders.h
+++ b/ydb/core/persqueue/type_coders.h
@@ -7,10 +7,17 @@
#include <library/cpp/packedtypes/zigzag.h>
#include <util/generic/typetraits.h>
+#include <util/generic/buffer.h>
namespace NKikimr {
namespace NScheme {
+
+template <typename T>
+void AppendPOD(TBuffer& output, const T& dt) {
+ output.Append((const char*)&dt, (ui32)sizeof(T));
+}
+
////////////////////////////////////////////////////////////////////////////////
template <ui16 Type, bool IsNullable>
@@ -39,10 +46,10 @@ public:
inline void AddNull() { Mask.Append(0, 1); }
inline void AddNonNull() { Mask.Append(1, 1); }
- inline void Seal(TFlatBlobDataOutputStream* output) {
+ inline void Seal(TBuffer& output) {
const ui32 maskSize = Mask.Words() * sizeof(TMask::TWord);
- output->Write((const char*)Mask.Data(), maskSize);
- output->Write((const char*)&maskSize, sizeof(maskSize)); // TODO: Reduce size of small masks (embed size into the first word).
+ output.Append((const char*)Mask.Data(), maskSize);
+ output.Append((const char*)&maskSize, sizeof(maskSize)); // TODO: Reduce size of small masks (embed size into the first word).
}
private:
@@ -56,7 +63,7 @@ public:
inline size_t MaxSize() const { return 0; }
inline void AddNull() { Y_FAIL("Null values are not supported."); }
inline void AddNonNull() { }
- inline void Seal(TFlatBlobDataOutputStream*) { }
+ inline void Seal(TBuffer&) { }
};
////////////////////////////////////////////////////////////////////////////////
@@ -64,11 +71,11 @@ public:
template <size_t Size, bool IsNullable>
class TFixedLenCoder : public TChunkCoderBase<ui16(TCodecType::FixedLen), IsNullable> {
public:
- TFixedLenCoder(TFlatBlobDataOutputStream* output)
+ TFixedLenCoder(TBuffer& output)
: DataSize(0)
, Output(output)
{
- output->WritePOD(TFixedLenCoder::Sig());
+ AppendPOD(output, TFixedLenCoder::Sig());
}
size_t GetEstimatedSize() const override {
@@ -84,20 +91,20 @@ protected:
Y_VERIFY(size == Size, "Size mismatch.");
Mask.AddNonNull();
DataSize += Size;
- Output->Write(data, size);
+ Output.Append(data, size);
}
void DoAddNull() override {
Mask.AddNull();
static char zero[Size ? Size : 1];
- Output->Write(zero, Size);
+ Output.Append(zero, Size);
// TODO: Consider using succinct Rank structure instead.
}
private:
size_t DataSize;
TCoderMask<IsNullable> Mask;
- TFlatBlobDataOutputStream* Output;
+ TBuffer& Output;
};
////////////////////////////////////////////////////////////////////////////////
@@ -105,11 +112,11 @@ private:
template <bool IsNullable>
class TVarLenCoder : public TChunkCoderBase<ui16(TCodecType::VarLen), IsNullable> {
public:
- TVarLenCoder(TFlatBlobDataOutputStream* output)
+ TVarLenCoder(TBuffer& output)
: DataSize(0)
, Output(output)
{
- output->WritePOD(TVarLenCoder::Sig());
+ AppendPOD(output, TVarLenCoder::Sig());
}
size_t GetEstimatedSize() const override {
@@ -117,7 +124,7 @@ public:
}
void Seal() override {
- Output->Write((const char*)Offsets.data(), Offsets.size() * sizeof(ui32));
+ Output.Append((const char*)Offsets.data(), Offsets.size() * sizeof(ui32));
Mask.Seal(Output);
}
@@ -126,7 +133,7 @@ protected:
Mask.AddNonNull();
DataSize += size;
Offsets.push_back(DataSize);
- Output->Write(data, size);
+ Output.Append(data, size);
}
void DoAddNull() override {
@@ -138,7 +145,7 @@ private:
ui32 DataSize;
TVector<ui32> Offsets;
TCoderMask<IsNullable> Mask;
- TFlatBlobDataOutputStream* Output;
+ TBuffer& Output;
};
////////////////////////////////////////////////////////////////////////////////
@@ -149,11 +156,11 @@ public:
using TType = TIntType;
using TSigned = std::make_signed_t<TType>;
- inline size_t Save(TFlatBlobDataOutputStream* output, TType value) {
+ inline size_t Save(TBuffer& output, TType value) {
const auto outValue = static_cast<i64>(value); // TODO: fix out_long(i32)
char varIntOut[sizeof(outValue) + 1];
auto bytes = out_long(outValue, varIntOut);
- output->Write(varIntOut, bytes);
+ output.Append(varIntOut, bytes);
return bytes;
}
};
@@ -164,11 +171,11 @@ public:
using TType = TIntType;
using TSigned = std::make_signed_t<TType>;
- inline size_t Save(TFlatBlobDataOutputStream* output, TType value) {
+ inline size_t Save(TBuffer& output, TType value) {
const auto zigZagged = static_cast<i64>(ZigZagEncode(value));
char varIntOut[sizeof(zigZagged) + 1];
auto bytes = out_long(zigZagged, varIntOut);
- output->Write(varIntOut, bytes);
+ output.Append(varIntOut, bytes);
return bytes;
}
};
@@ -178,7 +185,7 @@ class TDeltaValueCoder : public TValueCoder {
public:
using TType = typename TValueCoder::TType;
- inline size_t Save(TFlatBlobDataOutputStream* output, TType value) {
+ inline size_t Save(TBuffer& output, TType value) {
auto delta = Rev ? Last - value : value - Last;
Last = value;
return TValueCoder::Save(output, delta);
@@ -193,11 +200,11 @@ class TByteAlignedIntCoder : public TChunkCoderBase<CodecType, IsNullable> {
public:
using TType = typename TValueCoder::TType;
- TByteAlignedIntCoder(TFlatBlobDataOutputStream* output)
+ TByteAlignedIntCoder(TBuffer& output)
: DataSize(0)
, Output(output)
{
- output->WritePOD(TByteAlignedIntCoder::Sig());
+ AppendPOD(output, TByteAlignedIntCoder::Sig());
}
size_t GetEstimatedSize() const override {
@@ -222,14 +229,14 @@ protected:
private:
size_t DataSize;
TCoderMask<IsNullable> Mask;
- TFlatBlobDataOutputStream* Output;
+ TBuffer& Output;
TValueCoder ValueCoder;
};
template <typename TIntType, bool IsNullable>
class TVarIntCoder : public TByteAlignedIntCoder<TVarIntValueCoder<TIntType>, ui16(TCodecType::VarInt), IsNullable> {
public:
- TVarIntCoder(TFlatBlobDataOutputStream* output)
+ TVarIntCoder(TBuffer& output)
: TByteAlignedIntCoder<TVarIntValueCoder<TIntType>, ui16(TCodecType::VarInt), IsNullable>(output)
{ }
};
@@ -237,7 +244,7 @@ public:
template <typename TIntType, bool IsNullable>
class TZigZagCoder : public TByteAlignedIntCoder<TZigZagValueCoder<TIntType>, ui16(TCodecType::ZigZag), IsNullable> {
public:
- TZigZagCoder(TFlatBlobDataOutputStream* output)
+ TZigZagCoder(TBuffer& output)
: TByteAlignedIntCoder<TZigZagValueCoder<TIntType>, ui16(TCodecType::ZigZag), IsNullable>(output)
{ }
};
@@ -245,7 +252,7 @@ public:
template <typename TIntType, bool IsNullable>
class TDeltaVarIntCoder : public TByteAlignedIntCoder<TDeltaValueCoder<TVarIntValueCoder<TIntType>>, ui16(TCodecType::DeltaVarInt), IsNullable> {
public:
- TDeltaVarIntCoder(TFlatBlobDataOutputStream* output)
+ TDeltaVarIntCoder(TBuffer& output)
: TByteAlignedIntCoder<TDeltaValueCoder<TVarIntValueCoder<TIntType>>, ui16(TCodecType::DeltaVarInt), IsNullable>(output)
{ }
};
@@ -253,7 +260,7 @@ public:
template <typename TIntType, bool IsNullable>
class TDeltaRevVarIntCoder : public TByteAlignedIntCoder<TDeltaValueCoder<TVarIntValueCoder<TIntType>, true>, ui16(TCodecType::DeltaRevVarInt), IsNullable> {
public:
- TDeltaRevVarIntCoder(TFlatBlobDataOutputStream* output)
+ TDeltaRevVarIntCoder(TBuffer& output)
: TByteAlignedIntCoder<TDeltaValueCoder<TVarIntValueCoder<TIntType>, true>, ui16(TCodecType::DeltaRevVarInt), IsNullable>(output)
{ }
};
@@ -261,7 +268,7 @@ public:
template <typename TIntType, bool IsNullable>
class TDeltaZigZagCoder : public TByteAlignedIntCoder<TDeltaValueCoder<TZigZagValueCoder<TIntType>>, ui16(TCodecType::DeltaZigZag), IsNullable> {
public:
- TDeltaZigZagCoder(TFlatBlobDataOutputStream* output)
+ TDeltaZigZagCoder(TBuffer& output)
: TByteAlignedIntCoder<TDeltaValueCoder<TZigZagValueCoder<TIntType>>, ui16(TCodecType::DeltaZigZag), IsNullable>(output)
{ }
};
@@ -271,10 +278,10 @@ public:
template <bool IsNullable>
class TBoolCoder : public TChunkCoderBase<ui16(TCodecType::Bool), IsNullable> {
public:
- TBoolCoder(TFlatBlobDataOutputStream* output)
+ TBoolCoder(TBuffer& output)
: Output(output)
{
- output->WritePOD(TBoolCoder::Sig());
+ AppendPOD(output, TBoolCoder::Sig());
}
size_t GetEstimatedSize() const override {
@@ -282,7 +289,7 @@ public:
}
void Seal() override {
- Output->Write((const char*)Mask.Data(), Mask.Words() * sizeof(TMask::TWord));
+ Output.Append((const char*)Mask.Data(), Mask.Words() * sizeof(TMask::TWord));
}
protected:
@@ -301,7 +308,7 @@ protected:
protected:
using TMask = TBitVector<ui16>;
TMask Mask;
- TFlatBlobDataOutputStream* Output;
+ TBuffer& Output;
};
////////////////////////////////////////////////////////////////////////////////
diff --git a/ydb/core/persqueue/ut/internals_ut.cpp b/ydb/core/persqueue/ut/internals_ut.cpp
index 7f953e042d..f366530ad4 100644
--- a/ydb/core/persqueue/ut/internals_ut.cpp
+++ b/ydb/core/persqueue/ut/internals_ut.cpp
@@ -182,11 +182,11 @@ Y_UNIT_TEST(TestBatchPacking) {
));
}
batch.Pack();
- TString s = batch.PackedData;
+ TBuffer b = batch.PackedData;
UNIT_ASSERT(batch.Header.GetFormat() == NKikimrPQ::TBatchHeader::ECompressed);
batch.Unpack();
batch.Pack();
- UNIT_ASSERT(batch.PackedData == s);
+ UNIT_ASSERT(batch.PackedData == b);
TString str;
batch.SerializeTo(str);
auto header = ExtractHeader(str.c_str(), str.size());
diff --git a/ydb/core/persqueue/ut/type_codecs_ut.cpp b/ydb/core/persqueue/ut/type_codecs_ut.cpp
index af0e1e1f53..09f0bb7563 100644
--- a/ydb/core/persqueue/ut/type_codecs_ut.cpp
+++ b/ydb/core/persqueue/ut/type_codecs_ut.cpp
@@ -19,8 +19,8 @@ using TDataRef = NScheme::TDataRef;
Y_UNIT_TEST_SUITE(TTypeCodecsTest) {
void Metrics(const TVector<TDataRef>& values, const ICodec* codec) {
- TAutoPtr<TFlatBlobDataOutputStream> output(new TFlatBlobDataOutputStream());
- auto chunk = codec->MakeChunk(output.Get());
+ TBuffer output;
+ auto chunk = codec->MakeChunk(output);
auto start = TInstant::Now();
for (const auto& value : values) {
@@ -32,11 +32,10 @@ Y_UNIT_TEST_SUITE(TTypeCodecsTest) {
chunk->Seal();
auto duration = TInstant::Now() - start;
- Cerr << "Size: " << output->GetCurrentOffset() << Endl;
+ Cerr << "Size: " << output.Size() << Endl;
Cerr << "Create chunk: " << duration << Endl;
- auto reading = codec->ReadChunk(output->GetBuffer());
- output.Reset(nullptr);
+ auto reading = codec->ReadChunk(output);
start = TInstant::Now();
for (size_t i = 0, size = values.size(); i != size; ++i) {
@@ -55,8 +54,8 @@ Y_UNIT_TEST_SUITE(TTypeCodecsTest) {
}
void TestImpl(const TVector<TDataRef>& values, const ICodec* codec) {
- TAutoPtr<TFlatBlobDataOutputStream> output(new TFlatBlobDataOutputStream());
- auto chunk = codec->MakeChunk(output.Get());
+ TBuffer output;
+ auto chunk = codec->MakeChunk(output);
for (const auto& value : values) {
if (value.IsNull())
chunk->AddNull();
@@ -65,9 +64,8 @@ Y_UNIT_TEST_SUITE(TTypeCodecsTest) {
}
chunk->Seal();
- auto reading = codec->ReadChunk(output->GetBuffer());
+ auto reading = codec->ReadChunk(output);
auto iter = reading->MakeIterator();
- output.Reset(nullptr);
for (size_t i = 0; i != values.size(); ++i) {
const auto& value = values[i];
diff --git a/ydb/core/util/blob_data_stream.h b/ydb/core/util/blob_data_stream.h
deleted file mode 100644
index dac5c0234b..0000000000
--- a/ydb/core/util/blob_data_stream.h
+++ /dev/null
@@ -1,205 +0,0 @@
-#pragma once
-
-#include "defs.h"
-
-#include <util/generic/buffer.h>
-
-////////////////////////////////////////////
-namespace NKikimr {
-
-////////////////////////////////////////////
-class TFlatBlobDataInputStream {
-public:
- //
- TFlatBlobDataInputStream() {}
- TFlatBlobDataInputStream(const TStringBuf& buffer)
- : Buffer(buffer)
- {}
- ~TFlatBlobDataInputStream() {}
-
- //
- const void* ReadAt(ui32 offset, ui32 size) const {
- Y_VERIFY_DEBUG(Buffer.size() >= offset + size);
- Y_UNUSED(size);
-
- return Buffer.data() + offset;
- }
-
- //
- template<typename T>
- const T* ReadAt(ui32 offset) const {
- return (const T*)ReadAt(offset, sizeof(T));
- }
-
-private:
- //
- TStringBuf Buffer;
-};
-
-////////////////////////////////////////////
-class TFlatBlobDataOutputStream {
-public:
- //
- TFlatBlobDataOutputStream()
- {}
- TFlatBlobDataOutputStream(TString bufferToGrab)
- : Buffer(bufferToGrab)
- {}
- ~TFlatBlobDataOutputStream() {}
-
- // IBlobDataStream intarface
- ui32 GetCurrentOffset() const {
- return Buffer.size();
- }
-
- template<typename T>
- void WritePOD(const T& dt) {
- Write((const char*)&dt, (ui32)sizeof(T));
- }
-
- void Write(const char* data, ui32 size) {
- Buffer.AppendNoAlias(data, size);
- }
-
- void WriteAtPosition(ui32 position, const char* data, ui32 size) {
- memcpy((char*)Buffer.data() + position, data, size);
- }
-
- void Write(const TFlatBlobDataOutputStream* anotherStream) {
- TStringBuf buf = anotherStream->CurrentBuffer();
- Write(buf.data(), buf.size());
- }
-
- TStringBuf CurrentBuffer() const {
- return TStringBuf(Buffer.data(), Buffer.size());
- }
-
- const TString& GetBuffer() const {
- return Buffer;
- }
-
- ////////////////////////////////////////////
- class TFutureValueBase {
- public:
- //
- TFutureValueBase(TFlatBlobDataOutputStream* dataStream, ui32 position, ui32 size)
- : DataStream(dataStream)
- , Position(position)
- , Size(size)
- {}
- ~TFutureValueBase() {}
-
- //
- bool IsValid() const { return DataStream != nullptr; }
- explicit operator bool() const { return IsValid(); }
- bool operator !() const { return !IsValid(); }
-
- protected:
- //
- TFlatBlobDataOutputStream* DataStream;
- ui32 Position;
- ui32 Size;
- };
-
-
- ////////////////////////////////////////////
- class TFutureValue : public TFutureValueBase {
- public:
- //
- TFutureValue(TFlatBlobDataOutputStream* dataStream = nullptr, ui32 position = 0, ui32 size = 0)
- : TFutureValueBase(dataStream, position, size)
- {}
- ~TFutureValue() {}
-
- //
- template<typename T>
- void SetValue(const T& value) {
- Y_VERIFY(sizeof(T) == Size);
- DataStream->WriteAtPosition(Position, (const char*)&value, sizeof(T));
- }
- };
-
- ////////////////////////////////////////////
- template<typename T>
- class TFutureValuePOD : public TFutureValueBase {
- public:
- //
- TFutureValuePOD(TFlatBlobDataOutputStream* dataStream = nullptr, ui32 position = 0, ui32 size = 0)
- : TFutureValueBase(dataStream, position, size)
- {}
- ~TFutureValuePOD() {}
-
- //
- void SetValue(const T& value) {
- Y_VERIFY(sizeof(T) == Size);
- DataStream->WriteAtPosition(Position, (const char*)&value, sizeof(T));
- }
- };
-
- //
- TFutureValue FutureWrite(const char* data, ui32 size) {
- ui32 currentOffset = GetCurrentOffset();
- Write(data, size);
- return TFutureValue(this, currentOffset, size);
- }
-
- template <typename T>
- TFutureValuePOD<T> FutureWritePOD(const T& dt) {
- ui32 currentOffset = GetCurrentOffset();
- WritePOD<T>(dt);
- return TFutureValuePOD<T>(this, currentOffset, (ui32)sizeof(T));
- }
-
-private:
- //
- TString Buffer;
-};
-
-////////////////////////////////////////////
-class TBufferReader {
-public:
- //
- TBufferReader(const TStringBuf& buffer)
- : Buffer(buffer)
- , Offset(0)
- {}
- ~TBufferReader() {}
-
- void SetOffset(ui32 newOffset) {
- Offset = newOffset;
- Y_VERIFY_DEBUG(Offset <= Buffer.size());
- }
- void AddOffset(ui32 incOffset) {
- Offset += incOffset;
- Y_VERIFY_DEBUG(Offset <= Buffer.size());
- }
-
- //
- template<typename T>
- const T* ReadData() {
- Y_VERIFY_DEBUG(Offset + sizeof(T) <= Buffer.size());
- T* retVal = (T*) (Buffer.data() + Offset);
- Offset += sizeof(T);
- return retVal;
- }
-
- TStringBuf ReadData(ui32 len) {
- Y_VERIFY_DEBUG(Offset + len <= Buffer.size());
- TStringBuf retVal(Buffer.data() + Offset, len);
- Offset += len;
- return retVal;
- }
-
- TStringBuf GetBuffer() const {
- Y_VERIFY_DEBUG(Buffer.size() > Offset);
- return TStringBuf(Buffer.data() + Offset, Buffer.size() - Offset);
- }
-
-private:
- //
- TStringBuf Buffer;
- ui32 Offset;
-};
-
-} // end of the NKikimr namespace
-