aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov333 <111685085+ivanmorozov333@users.noreply.github.com>2024-01-17 14:22:10 +0300
committerGitHub <noreply@github.com>2024-01-17 14:22:10 +0300
commita0c44acd6a8dd119921f5ac8ce5d0f7fd474d24c (patch)
treef74f5c544ab4dcbf3baa144f621e0e1a63aa85f4
parent4fbaac4f0dfebd9a97b942064b6905dfe304f34c (diff)
downloadydb-a0c44acd6a8dd119921f5ac8ce5d0f7fd474d24c.tar.gz
fix stats for splitting (#1079)
* fix stats for splitting * fix
-rw-r--r--ydb/core/tx/columnshard/splitter/batch_slice.cpp4
-rw-r--r--ydb/core/tx/columnshard/splitter/simple.cpp6
-rw-r--r--ydb/core/tx/columnshard/splitter/simple.h2
-rw-r--r--ydb/core/tx/columnshard/splitter/stats.cpp5
-rw-r--r--ydb/core/tx/columnshard/splitter/stats.h65
5 files changed, 48 insertions, 34 deletions
diff --git a/ydb/core/tx/columnshard/splitter/batch_slice.cpp b/ydb/core/tx/columnshard/splitter/batch_slice.cpp
index 8c8ea7db218..06ce8cebd56 100644
--- a/ydb/core/tx/columnshard/splitter/batch_slice.cpp
+++ b/ydb/core/tx/columnshard/splitter/batch_slice.cpp
@@ -142,7 +142,9 @@ TBatchSerializedSlice::TBatchSerializedSlice(std::shared_ptr<arrow::RecordBatch>
auto columnSaver = schema->GetColumnSaver(c.GetColumnId());
auto stats = schema->GetColumnSerializationStats(c.GetColumnId());
TSimpleSplitter splitter(columnSaver, Counters);
- splitter.SetStats(stats);
+ if (stats) {
+ splitter.SetStats(*stats);
+ }
std::vector<IPortionColumnChunk::TPtr> chunks;
for (auto&& i : splitter.Split(i, c.GetField(), Settings.GetMaxBlobSize())) {
chunks.emplace_back(std::make_shared<TSplittedColumnChunk>(c.GetColumnId(), i, Schema));
diff --git a/ydb/core/tx/columnshard/splitter/simple.cpp b/ydb/core/tx/columnshard/splitter/simple.cpp
index f6691138703..690309c1903 100644
--- a/ydb/core/tx/columnshard/splitter/simple.cpp
+++ b/ydb/core/tx/columnshard/splitter/simple.cpp
@@ -74,7 +74,7 @@ public:
result.emplace_back(*this);
} else {
Counters->SimpleSplitter.OnTrashSerialized(blob.size());
- TSimpleSerializationStat stats(blob.size(), Data->num_rows(), NArrow::GetBatchDataSize(Data));
+ TBatchSerializationStat stats(blob.size(), Data->num_rows(), NArrow::GetBatchDataSize(Data));
SplitFactor = stats.PredictOptimalSplitFactor(Data->num_rows(), MaxBlobSize).value_or(1);
if (SplitFactor == 1) {
SplitFactor = 2;
@@ -105,7 +105,7 @@ public:
if (badStartPosition) {
AFL_VERIFY(badBatchRecordsCount && badBatchCount)("count", badBatchCount)("records", badBatchRecordsCount);
auto badSlice = Data->Slice(*badStartPosition, badBatchRecordsCount);
- TSimpleSerializationStat stats(badBatchSerializedSize, badBatchRecordsCount, Max<ui32>());
+ TBatchSerializationStat stats(badBatchSerializedSize, badBatchRecordsCount, Max<ui32>());
result.emplace_back(std::max<ui32>(stats.PredictOptimalSplitFactor(badBatchRecordsCount, MaxBlobSize).value_or(1), badBatchCount) + 1, MaxBlobSize, badSlice, ColumnSaver, Counters);
badStartPosition = {};
badBatchRecordsCount = 0;
@@ -118,7 +118,7 @@ public:
}
if (badStartPosition) {
auto badSlice = Data->Slice(*badStartPosition, badBatchRecordsCount);
- TSimpleSerializationStat stats(badBatchSerializedSize, badBatchRecordsCount, Max<ui32>());
+ TBatchSerializationStat stats(badBatchSerializedSize, badBatchRecordsCount, Max<ui32>());
result.emplace_back(std::max<ui32>(stats.PredictOptimalSplitFactor(badBatchRecordsCount, MaxBlobSize).value_or(1), badBatchCount) + 1, MaxBlobSize, badSlice, ColumnSaver, Counters);
}
++SplitFactor;
diff --git a/ydb/core/tx/columnshard/splitter/simple.h b/ydb/core/tx/columnshard/splitter/simple.h
index 6746f4c098e..1878e72c6ae 100644
--- a/ydb/core/tx/columnshard/splitter/simple.h
+++ b/ydb/core/tx/columnshard/splitter/simple.h
@@ -110,7 +110,7 @@ public:
class TSimpleSplitter {
private:
TColumnSaver ColumnSaver;
- YDB_ACCESSOR_DEF(std::optional<TColumnSerializationStat>, Stats);
+ YDB_ACCESSOR_DEF(std::optional<TBatchSerializationStat>, Stats);
std::shared_ptr<NColumnShard::TSplitterCounters> Counters;
public:
explicit TSimpleSplitter(const TColumnSaver& columnSaver, std::shared_ptr<NColumnShard::TSplitterCounters> counters)
diff --git a/ydb/core/tx/columnshard/splitter/stats.cpp b/ydb/core/tx/columnshard/splitter/stats.cpp
index 23f1e0c60d7..a028b4a9e18 100644
--- a/ydb/core/tx/columnshard/splitter/stats.cpp
+++ b/ydb/core/tx/columnshard/splitter/stats.cpp
@@ -10,10 +10,9 @@ std::optional<TBatchSerializationStat> TSerializationStats::GetStatsForRecordBat
if (!columnInfo) {
return {};
} else if (!result) {
- result = TBatchSerializationStat(*columnInfo);
- } else {
- result->Merge(*columnInfo);
+ result = TBatchSerializationStat();
}
+ result->Merge(*columnInfo);
}
return result;
}
diff --git a/ydb/core/tx/columnshard/splitter/stats.h b/ydb/core/tx/columnshard/splitter/stats.h
index 4ca2f868bac..6c7c5e154d6 100644
--- a/ydb/core/tx/columnshard/splitter/stats.h
+++ b/ydb/core/tx/columnshard/splitter/stats.h
@@ -30,6 +30,15 @@ public:
Y_ABORT_UNLESS(RawBytes);
}
+ double GetSerializedBytesPerRecord() const {
+ AFL_VERIFY(RecordsCount);
+ return 1.0 * SerializedBytes / RecordsCount;
+ }
+ double GetRawBytesPerRecord() const {
+ AFL_VERIFY(RecordsCount);
+ return 1.0 * RawBytes / RecordsCount;
+ }
+
ui64 GetSerializedBytes() const{
return SerializedBytes;
}
@@ -54,28 +63,47 @@ public:
Y_ABORT_UNLESS(RawBytes >= stat.RawBytes);
RawBytes -= stat.RawBytes;
}
+};
- double GetPackedRecordSize() const {
- return (double)SerializedBytes / RecordsCount;
+class TBatchSerializationStat {
+protected:
+ double SerializedBytesPerRecord = 0;
+ double RawBytesPerRecord = 0;
+public:
+ TBatchSerializationStat() = default;
+ TBatchSerializationStat(const ui64 bytes, const ui64 recordsCount, const ui64 rawBytes) {
+ Y_ABORT_UNLESS(recordsCount);
+ SerializedBytesPerRecord = 1.0 * bytes / recordsCount;
+ RawBytesPerRecord = 1.0 * rawBytes / recordsCount;
+ }
+
+ TBatchSerializationStat(const TSimpleSerializationStat& simple) {
+ SerializedBytesPerRecord = simple.GetSerializedBytesPerRecord();
+ RawBytesPerRecord = simple.GetRawBytesPerRecord();
+ }
+
+ void Merge(const TSimpleSerializationStat& item) {
+ SerializedBytesPerRecord += item.GetSerializedBytesPerRecord();
+ RawBytesPerRecord += item.GetRawBytesPerRecord();
}
std::optional<ui64> PredictOptimalPackRecordsCount(const ui64 recordsCount, const ui64 blobSize) const {
- if (!RecordsCount) {
+ if (!SerializedBytesPerRecord) {
return {};
}
- const ui64 fullSize = 1.0 * recordsCount / RecordsCount * SerializedBytes;
+ const ui64 fullSize = 1.0 * recordsCount * SerializedBytesPerRecord;
if (fullSize < blobSize) {
return recordsCount;
} else {
- return std::floor(1.0 * blobSize / SerializedBytes * RecordsCount);
+ return std::floor(1.0 * blobSize / SerializedBytesPerRecord);
}
}
std::optional<ui64> PredictOptimalSplitFactor(const ui64 recordsCount, const ui64 blobSize) const {
- if (!RecordsCount) {
+ if (!SerializedBytesPerRecord) {
return {};
}
- const ui64 fullSize = 1.0 * recordsCount / RecordsCount * SerializedBytes;
+ const ui64 fullSize = 1.0 * recordsCount * SerializedBytesPerRecord;
if (fullSize < blobSize) {
return 1;
} else {
@@ -84,25 +112,6 @@ public:
}
};
-class TBatchSerializationStat: public TSimpleSerializationStat {
-private:
- using TBase = TSimpleSerializationStat;
-public:
- using TBase::TBase;
- TBatchSerializationStat(const TSimpleSerializationStat& item)
- : TBase(item)
- {
-
- }
-
- void Merge(const TSimpleSerializationStat& item) {
- SerializedBytes += item.GetSerializedBytes();
- RawBytes += item.GetRawBytes();
- AFL_VERIFY(RecordsCount == item.GetRecordsCount())("self_count", RecordsCount)("new_count", item.GetRecordsCount());
- }
-
-};
-
class TColumnSerializationStat: public TSimpleSerializationStat {
private:
YDB_READONLY(ui32, ColumnId, 0);
@@ -114,6 +123,10 @@ public:
}
+ double GetPackedRecordSize() const {
+ return (double)SerializedBytes / RecordsCount;
+ }
+
TColumnSerializationStat RecalcForRecordsCount(const ui64 recordsCount) const {
TColumnSerializationStat result(ColumnId, ColumnName);
result.Merge(TSimpleSerializationStat(SerializedBytes / RecordsCount * recordsCount, recordsCount, RawBytes / RecordsCount * recordsCount));