diff options
author | ivanmorozov333 <111685085+ivanmorozov333@users.noreply.github.com> | 2024-01-17 14:22:10 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-01-17 14:22:10 +0300 |
commit | a0c44acd6a8dd119921f5ac8ce5d0f7fd474d24c (patch) | |
tree | f74f5c544ab4dcbf3baa144f621e0e1a63aa85f4 | |
parent | 4fbaac4f0dfebd9a97b942064b6905dfe304f34c (diff) | |
download | ydb-a0c44acd6a8dd119921f5ac8ce5d0f7fd474d24c.tar.gz |
fix stats for splitting (#1079)
* fix stats for splitting
* fix
-rw-r--r-- | ydb/core/tx/columnshard/splitter/batch_slice.cpp | 4 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/splitter/simple.cpp | 6 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/splitter/simple.h | 2 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/splitter/stats.cpp | 5 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/splitter/stats.h | 65 |
5 files changed, 48 insertions, 34 deletions
diff --git a/ydb/core/tx/columnshard/splitter/batch_slice.cpp b/ydb/core/tx/columnshard/splitter/batch_slice.cpp index 8c8ea7db218..06ce8cebd56 100644 --- a/ydb/core/tx/columnshard/splitter/batch_slice.cpp +++ b/ydb/core/tx/columnshard/splitter/batch_slice.cpp @@ -142,7 +142,9 @@ TBatchSerializedSlice::TBatchSerializedSlice(std::shared_ptr<arrow::RecordBatch> auto columnSaver = schema->GetColumnSaver(c.GetColumnId()); auto stats = schema->GetColumnSerializationStats(c.GetColumnId()); TSimpleSplitter splitter(columnSaver, Counters); - splitter.SetStats(stats); + if (stats) { + splitter.SetStats(*stats); + } std::vector<IPortionColumnChunk::TPtr> chunks; for (auto&& i : splitter.Split(i, c.GetField(), Settings.GetMaxBlobSize())) { chunks.emplace_back(std::make_shared<TSplittedColumnChunk>(c.GetColumnId(), i, Schema)); diff --git a/ydb/core/tx/columnshard/splitter/simple.cpp b/ydb/core/tx/columnshard/splitter/simple.cpp index f6691138703..690309c1903 100644 --- a/ydb/core/tx/columnshard/splitter/simple.cpp +++ b/ydb/core/tx/columnshard/splitter/simple.cpp @@ -74,7 +74,7 @@ public: result.emplace_back(*this); } else { Counters->SimpleSplitter.OnTrashSerialized(blob.size()); - TSimpleSerializationStat stats(blob.size(), Data->num_rows(), NArrow::GetBatchDataSize(Data)); + TBatchSerializationStat stats(blob.size(), Data->num_rows(), NArrow::GetBatchDataSize(Data)); SplitFactor = stats.PredictOptimalSplitFactor(Data->num_rows(), MaxBlobSize).value_or(1); if (SplitFactor == 1) { SplitFactor = 2; @@ -105,7 +105,7 @@ public: if (badStartPosition) { AFL_VERIFY(badBatchRecordsCount && badBatchCount)("count", badBatchCount)("records", badBatchRecordsCount); auto badSlice = Data->Slice(*badStartPosition, badBatchRecordsCount); - TSimpleSerializationStat stats(badBatchSerializedSize, badBatchRecordsCount, Max<ui32>()); + TBatchSerializationStat stats(badBatchSerializedSize, badBatchRecordsCount, Max<ui32>()); result.emplace_back(std::max<ui32>(stats.PredictOptimalSplitFactor(badBatchRecordsCount, MaxBlobSize).value_or(1), badBatchCount) + 1, MaxBlobSize, badSlice, ColumnSaver, Counters); badStartPosition = {}; badBatchRecordsCount = 0; @@ -118,7 +118,7 @@ public: } if (badStartPosition) { auto badSlice = Data->Slice(*badStartPosition, badBatchRecordsCount); - TSimpleSerializationStat stats(badBatchSerializedSize, badBatchRecordsCount, Max<ui32>()); + TBatchSerializationStat stats(badBatchSerializedSize, badBatchRecordsCount, Max<ui32>()); result.emplace_back(std::max<ui32>(stats.PredictOptimalSplitFactor(badBatchRecordsCount, MaxBlobSize).value_or(1), badBatchCount) + 1, MaxBlobSize, badSlice, ColumnSaver, Counters); } ++SplitFactor; diff --git a/ydb/core/tx/columnshard/splitter/simple.h b/ydb/core/tx/columnshard/splitter/simple.h index 6746f4c098e..1878e72c6ae 100644 --- a/ydb/core/tx/columnshard/splitter/simple.h +++ b/ydb/core/tx/columnshard/splitter/simple.h @@ -110,7 +110,7 @@ public: class TSimpleSplitter { private: TColumnSaver ColumnSaver; - YDB_ACCESSOR_DEF(std::optional<TColumnSerializationStat>, Stats); + YDB_ACCESSOR_DEF(std::optional<TBatchSerializationStat>, Stats); std::shared_ptr<NColumnShard::TSplitterCounters> Counters; public: explicit TSimpleSplitter(const TColumnSaver& columnSaver, std::shared_ptr<NColumnShard::TSplitterCounters> counters) diff --git a/ydb/core/tx/columnshard/splitter/stats.cpp b/ydb/core/tx/columnshard/splitter/stats.cpp index 23f1e0c60d7..a028b4a9e18 100644 --- a/ydb/core/tx/columnshard/splitter/stats.cpp +++ b/ydb/core/tx/columnshard/splitter/stats.cpp @@ -10,10 +10,9 @@ std::optional<TBatchSerializationStat> TSerializationStats::GetStatsForRecordBat if (!columnInfo) { return {}; } else if (!result) { - result = TBatchSerializationStat(*columnInfo); - } else { - result->Merge(*columnInfo); + result = TBatchSerializationStat(); } + result->Merge(*columnInfo); } return result; } diff --git a/ydb/core/tx/columnshard/splitter/stats.h b/ydb/core/tx/columnshard/splitter/stats.h index 4ca2f868bac..6c7c5e154d6 100644 --- a/ydb/core/tx/columnshard/splitter/stats.h +++ b/ydb/core/tx/columnshard/splitter/stats.h @@ -30,6 +30,15 @@ public: Y_ABORT_UNLESS(RawBytes); } + double GetSerializedBytesPerRecord() const { + AFL_VERIFY(RecordsCount); + return 1.0 * SerializedBytes / RecordsCount; + } + double GetRawBytesPerRecord() const { + AFL_VERIFY(RecordsCount); + return 1.0 * RawBytes / RecordsCount; + } + ui64 GetSerializedBytes() const{ return SerializedBytes; } @@ -54,28 +63,47 @@ public: Y_ABORT_UNLESS(RawBytes >= stat.RawBytes); RawBytes -= stat.RawBytes; } +}; - double GetPackedRecordSize() const { - return (double)SerializedBytes / RecordsCount; +class TBatchSerializationStat { +protected: + double SerializedBytesPerRecord = 0; + double RawBytesPerRecord = 0; +public: + TBatchSerializationStat() = default; + TBatchSerializationStat(const ui64 bytes, const ui64 recordsCount, const ui64 rawBytes) { + Y_ABORT_UNLESS(recordsCount); + SerializedBytesPerRecord = 1.0 * bytes / recordsCount; + RawBytesPerRecord = 1.0 * rawBytes / recordsCount; + } + + TBatchSerializationStat(const TSimpleSerializationStat& simple) { + SerializedBytesPerRecord = simple.GetSerializedBytesPerRecord(); + RawBytesPerRecord = simple.GetRawBytesPerRecord(); + } + + void Merge(const TSimpleSerializationStat& item) { + SerializedBytesPerRecord += item.GetSerializedBytesPerRecord(); + RawBytesPerRecord += item.GetRawBytesPerRecord(); } std::optional<ui64> PredictOptimalPackRecordsCount(const ui64 recordsCount, const ui64 blobSize) const { - if (!RecordsCount) { + if (!SerializedBytesPerRecord) { return {}; } - const ui64 fullSize = 1.0 * recordsCount / RecordsCount * SerializedBytes; + const ui64 fullSize = 1.0 * recordsCount * SerializedBytesPerRecord; if (fullSize < blobSize) { return recordsCount; } else { - return std::floor(1.0 * blobSize / SerializedBytes * RecordsCount); + return std::floor(1.0 * blobSize / SerializedBytesPerRecord); } } std::optional<ui64> PredictOptimalSplitFactor(const ui64 recordsCount, const ui64 blobSize) const { - if (!RecordsCount) { + if (!SerializedBytesPerRecord) { return {}; } - const ui64 fullSize = 1.0 * recordsCount / RecordsCount * SerializedBytes; + const ui64 fullSize = 1.0 * recordsCount * SerializedBytesPerRecord; if (fullSize < blobSize) { return 1; } else { @@ -84,25 +112,6 @@ public: } }; -class TBatchSerializationStat: public TSimpleSerializationStat { -private: - using TBase = TSimpleSerializationStat; -public: - using TBase::TBase; - TBatchSerializationStat(const TSimpleSerializationStat& item) - : TBase(item) - { - - } - - void Merge(const TSimpleSerializationStat& item) { - SerializedBytes += item.GetSerializedBytes(); - RawBytes += item.GetRawBytes(); - AFL_VERIFY(RecordsCount == item.GetRecordsCount())("self_count", RecordsCount)("new_count", item.GetRecordsCount()); - } - -}; - class TColumnSerializationStat: public TSimpleSerializationStat { private: YDB_READONLY(ui32, ColumnId, 0); @@ -114,6 +123,10 @@ public: } + double GetPackedRecordSize() const { + return (double)SerializedBytes / RecordsCount; + } + TColumnSerializationStat RecalcForRecordsCount(const ui64 recordsCount) const { TColumnSerializationStat result(ColumnId, ColumnName); result.Merge(TSimpleSerializationStat(SerializedBytes / RecordsCount * recordsCount, recordsCount, RawBytes / RecordsCount * recordsCount)); |