diff options
author | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-10-12 11:47:25 +0300 |
---|---|---|
committer | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-10-12 12:28:03 +0300 |
commit | dc3f01d5126d446177c38dbed4ad0fd1575d49a6 (patch) | |
tree | a139533b5247f8efa844fa04ebc02aac76b8653e | |
parent | 385d8b0c79247b98a4c69ed43cbd4198bd5432a9 (diff) | |
download | ydb-dc3f01d5126d446177c38dbed4ad0fd1575d49a6.tar.gz |
KIKIMR-19093: fix blobs splitting
-rw-r--r-- | ydb/core/tx/columnshard/splitter/batch_slice.cpp | 15 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/splitter/simple.cpp | 156 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/splitter/simple.h | 6 |
3 files changed, 140 insertions, 37 deletions
diff --git a/ydb/core/tx/columnshard/splitter/batch_slice.cpp b/ydb/core/tx/columnshard/splitter/batch_slice.cpp index 1c76f4a18db..0a3dc0ff0f6 100644 --- a/ydb/core/tx/columnshard/splitter/batch_slice.cpp +++ b/ydb/core/tx/columnshard/splitter/batch_slice.cpp @@ -49,16 +49,21 @@ bool TGeneralSerializedSlice::GroupBlobs(std::vector<TSplittedBlob>& blobs) { Y_ABORT_UNLESS((i64)chunksInProgress[i]->GetPackedSize() > Settings.GetMinBlobSize() - partSize); Y_ABORT_UNLESS(otherSize - (Settings.GetMinBlobSize() - partSize) >= Settings.GetMinBlobSize()); - Counters->BySizeSplitter.OnTrashSerialized(chunksInProgress[i]->GetPackedSize()); - const std::vector<ui64> sizes = {(ui64)(Settings.GetMinBlobSize() - partSize)}; - std::vector<IPortionColumnChunk::TPtr> newChunks = chunksInProgress[i]->InternalSplit(Schema->GetColumnSaver(chunksInProgress[i]->GetColumnId()), Counters, sizes); - chunksInProgress.erase(chunksInProgress.begin() + i); - chunksInProgress.insert(chunksInProgress.begin() + i, newChunks.begin(), newChunks.end()); + std::vector<IPortionColumnChunk::TPtr> newChunks; + const bool splittable = chunksInProgress[i]->GetRecordsCount() > 1; + if (splittable) { + Counters->BySizeSplitter.OnTrashSerialized(chunksInProgress[i]->GetPackedSize()); + const std::vector<ui64> sizes = {(ui64)(Settings.GetMinBlobSize() - partSize)}; + newChunks = chunksInProgress[i]->InternalSplit(Schema->GetColumnSaver(chunksInProgress[i]->GetColumnId()), Counters, sizes); + chunksInProgress.erase(chunksInProgress.begin() + i); + chunksInProgress.insert(chunksInProgress.begin() + i, newChunks.begin(), newChunks.end()); + } TSplittedBlob newBlob; for (ui32 chunk = 0; chunk <= i; ++chunk) { newBlob.Take(chunksInProgress[chunk]); } + AFL_VERIFY(splittable || newBlob.GetSize() < Settings.GetMaxBlobSize())("splittable", splittable)("blob_size", newBlob.GetSize())("max", Settings.GetMaxBlobSize()); if (newBlob.GetSize() < Settings.GetMaxBlobSize()) { chunksInProgress.erase(chunksInProgress.begin(), chunksInProgress.begin() + i + 1); result.emplace_back(std::move(newBlob)); diff --git a/ydb/core/tx/columnshard/splitter/simple.cpp b/ydb/core/tx/columnshard/splitter/simple.cpp index a8343c8d6e5..f6691138703 100644 --- a/ydb/core/tx/columnshard/splitter/simple.cpp +++ b/ydb/core/tx/columnshard/splitter/simple.cpp @@ -1,6 +1,7 @@ #include "simple.h" #include <ydb/core/formats/arrow/arrow_helpers.h> #include <ydb/core/formats/arrow/common/validation.h> +#include <ydb/core/formats/arrow/size_calcer.h> #include <util/string/join.h> namespace NKikimr::NOlap { @@ -24,41 +25,138 @@ std::vector<TSaverSplittedChunk> TSimpleSplitter::Split(const std::shared_ptr<ar return Split(batch, maxBlobSize); } -std::vector<TSaverSplittedChunk> TSimpleSplitter::Split(const std::shared_ptr<arrow::RecordBatch>& data, const ui32 maxBlobSize) const { - Y_ABORT_UNLESS(data->num_columns() == 1); - ui64 splitFactor = Stats ? Stats->PredictOptimalSplitFactor(data->num_rows(), maxBlobSize).value_or(1) : 1; - while (true) { - Y_ABORT_UNLESS(splitFactor < 100); - std::vector<TSaverSplittedChunk> result; - result.reserve(splitFactor); - bool isCorrect = true; - ui64 serializedDataBytes = 0; - if (splitFactor == 1) { - TString blob = ColumnSaver.Apply(data); - isCorrect = blob.size() < maxBlobSize; - serializedDataBytes += blob.size(); - result.emplace_back(TSaverSplittedChunk(data, std::move(blob))); - } else { - TLinearSplitInfo linearSplitting = TSimpleSplitter::GetLinearSplittingByMax(data->num_rows(), data->num_rows() / splitFactor); - for (auto it = linearSplitting.StartIterator(); it.IsValid(); it.Next()) { - auto slice = data->Slice(it.GetPosition(), it.GetCurrentPackSize()); - result.emplace_back(slice, ColumnSaver.Apply(slice)); - serializedDataBytes += result.back().GetSerializedChunk().size(); - if (result.back().GetSerializedChunk().size() >= maxBlobSize) { - isCorrect = false; - Y_ABORT_UNLESS(!linearSplitting.IsMinimalGranularity()); - break; +class TSplitChunk { +private: + std::shared_ptr<arrow::RecordBatch> Data; + YDB_READONLY_DEF(std::optional<TSaverSplittedChunk>, Result); + ui32 SplitFactor = 0; + ui32 Iterations = 0; + ui32 MaxBlobSize = 8 * 1024 * 1024; + TColumnSaver ColumnSaver; + std::shared_ptr<NColumnShard::TSplitterCounters> Counters; +public: + TSplitChunk(const ui32 baseSplitFactor, const ui32 maxBlobSize, const std::shared_ptr<arrow::RecordBatch>& data, const TColumnSaver& columnSaver, const std::shared_ptr<NColumnShard::TSplitterCounters>& counters) + : Data(data) + , SplitFactor(baseSplitFactor) + , MaxBlobSize(maxBlobSize) + , ColumnSaver(columnSaver) + , Counters(counters) + { + AFL_VERIFY(Data && Data->num_rows()); + AFL_VERIFY(SplitFactor); + } + + TSplitChunk(const ui32 baseSplitFactor, const ui32 maxBlobSize, const std::shared_ptr<arrow::RecordBatch>& data, TString&& serializedData, const TColumnSaver& columnSaver, const std::shared_ptr<NColumnShard::TSplitterCounters>& counters) + : Data(data) + , Result(TSaverSplittedChunk(data, std::move(serializedData))) + , SplitFactor(baseSplitFactor) + , MaxBlobSize(maxBlobSize) + , ColumnSaver(columnSaver) + , Counters(counters) + { + AFL_VERIFY(Data && Data->num_rows()); + AFL_VERIFY(SplitFactor); + } + + std::vector<TSplitChunk> Split() { + while (true) { + AFL_VERIFY(!Result); + AFL_VERIFY(++Iterations < 100); + AFL_VERIFY(SplitFactor <= Data->num_rows())("factor", SplitFactor)("records", Data->num_rows())("iteration", Iterations)("size", NArrow::GetBatchDataSize(Data)); + bool found = false; + std::vector<TSplitChunk> result; + if (SplitFactor == 1) { + TString blob = ColumnSaver.Apply(Data); + if (blob.size() < MaxBlobSize) { + Counters->SimpleSplitter.OnCorrectSerialized(blob.size()); + Result = TSaverSplittedChunk(Data, std::move(blob)); + found = true; + result.emplace_back(*this); + } else { + Counters->SimpleSplitter.OnTrashSerialized(blob.size()); + TSimpleSerializationStat stats(blob.size(), Data->num_rows(), NArrow::GetBatchDataSize(Data)); + SplitFactor = stats.PredictOptimalSplitFactor(Data->num_rows(), MaxBlobSize).value_or(1); + if (SplitFactor == 1) { + SplitFactor = 2; + } + AFL_VERIFY(Data->num_rows() > 1); + } + } else { + TLinearSplitInfo linearSplitting = TSimpleSplitter::GetLinearSplittingByMax(Data->num_rows(), Data->num_rows() / SplitFactor); + TStringBuilder sb; + std::optional<ui32> badStartPosition; + ui32 badBatchRecordsCount = 0; + ui64 badBatchSerializedSize = 0; + ui32 badBatchCount = 0; + for (auto it = linearSplitting.StartIterator(); it.IsValid(); it.Next()) { + auto slice = Data->Slice(it.GetPosition(), it.GetCurrentPackSize()); + TString blob = ColumnSaver.Apply(slice); + if (blob.size() >= MaxBlobSize) { + Counters->SimpleSplitter.OnTrashSerialized(blob.size()); + if (!badStartPosition) { + badStartPosition = it.GetPosition(); + } + badBatchSerializedSize += blob.size(); + badBatchRecordsCount += it.GetCurrentPackSize(); + ++badBatchCount; + Y_ABORT_UNLESS(!linearSplitting.IsMinimalGranularity()); + } else { + Counters->SimpleSplitter.OnCorrectSerialized(blob.size()); + if (badStartPosition) { + AFL_VERIFY(badBatchRecordsCount && badBatchCount)("count", badBatchCount)("records", badBatchRecordsCount); + auto badSlice = Data->Slice(*badStartPosition, badBatchRecordsCount); + TSimpleSerializationStat stats(badBatchSerializedSize, badBatchRecordsCount, Max<ui32>()); + result.emplace_back(std::max<ui32>(stats.PredictOptimalSplitFactor(badBatchRecordsCount, MaxBlobSize).value_or(1), badBatchCount) + 1, MaxBlobSize, badSlice, ColumnSaver, Counters); + badStartPosition = {}; + badBatchRecordsCount = 0; + badBatchCount = 0; + badBatchSerializedSize = 0; + } + found = true; + result.emplace_back(1, MaxBlobSize, slice, std::move(blob), ColumnSaver, Counters); + } + } + if (badStartPosition) { + auto badSlice = Data->Slice(*badStartPosition, badBatchRecordsCount); + TSimpleSerializationStat stats(badBatchSerializedSize, badBatchRecordsCount, Max<ui32>()); + result.emplace_back(std::max<ui32>(stats.PredictOptimalSplitFactor(badBatchRecordsCount, MaxBlobSize).value_or(1), badBatchCount) + 1, MaxBlobSize, badSlice, ColumnSaver, Counters); } + ++SplitFactor; } + if (found) { + return result; + } + } + AFL_VERIFY(false); + return {}; + } +}; + +std::vector<TSaverSplittedChunk> TSimpleSplitter::Split(const std::shared_ptr<arrow::RecordBatch>& data, const ui32 maxBlobSize) const { + AFL_VERIFY(data->num_columns() == 1); + AFL_VERIFY(data->num_rows()); + TSplitChunk baseChunk(Stats ? Stats->PredictOptimalSplitFactor(data->num_rows(), maxBlobSize).value_or(1) : 1, maxBlobSize, data, ColumnSaver, Counters); + std::vector<TSplitChunk> chunks = {baseChunk}; + for (auto it = chunks.begin(); it != chunks.end(); ) { + AFL_VERIFY(chunks.size() < 100); + if (!!it->GetResult()) { + ++it; + continue; } - if (isCorrect) { - Counters->SimpleSplitter.OnCorrectSerialized(serializedDataBytes); - return result; + std::vector<TSplitChunk> splitted = it->Split(); + if (splitted.size() == 1) { + *it = splitted.front(); } else { - Counters->SimpleSplitter.OnTrashSerialized(serializedDataBytes); + it = chunks.insert(it, splitted.begin(), splitted.end()); + chunks.erase(it + splitted.size()); } - ++splitFactor; } + std::vector<TSaverSplittedChunk> result; + for (auto&& i : chunks) { + AFL_VERIFY(i.GetResult()); + result.emplace_back(*i.GetResult()); + } + return result; } std::vector<TSaverSplittedChunk> TSimpleSplitter::SplitByRecordsCount(std::shared_ptr<arrow::RecordBatch> data, const std::vector<ui64>& recordsCount) const { diff --git a/ydb/core/tx/columnshard/splitter/simple.h b/ydb/core/tx/columnshard/splitter/simple.h index 773fc805612..2366cdd4416 100644 --- a/ydb/core/tx/columnshard/splitter/simple.h +++ b/ydb/core/tx/columnshard/splitter/simple.h @@ -63,9 +63,9 @@ public: , PackSize(packSize) , ObjectsCount(objectsCount) { - Y_ABORT_UNLESS(objectsCount >= packsCount); - Y_ABORT_UNLESS(PackSize); - Y_ABORT_UNLESS(PacksCount); + AFL_VERIFY(objectsCount >= packsCount)("objects_count", objectsCount)("packs_count", packsCount); + AFL_VERIFY(PackSize); + AFL_VERIFY(PacksCount); } class TIterator { |