aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov <ivanmorozov@yandex-team.com>2023-10-12 11:47:25 +0300
committerivanmorozov <ivanmorozov@yandex-team.com>2023-10-12 12:28:03 +0300
commitdc3f01d5126d446177c38dbed4ad0fd1575d49a6 (patch)
treea139533b5247f8efa844fa04ebc02aac76b8653e
parent385d8b0c79247b98a4c69ed43cbd4198bd5432a9 (diff)
downloadydb-dc3f01d5126d446177c38dbed4ad0fd1575d49a6.tar.gz
KIKIMR-19093: fix blobs splitting
-rw-r--r--ydb/core/tx/columnshard/splitter/batch_slice.cpp15
-rw-r--r--ydb/core/tx/columnshard/splitter/simple.cpp156
-rw-r--r--ydb/core/tx/columnshard/splitter/simple.h6
3 files changed, 140 insertions, 37 deletions
diff --git a/ydb/core/tx/columnshard/splitter/batch_slice.cpp b/ydb/core/tx/columnshard/splitter/batch_slice.cpp
index 1c76f4a18db..0a3dc0ff0f6 100644
--- a/ydb/core/tx/columnshard/splitter/batch_slice.cpp
+++ b/ydb/core/tx/columnshard/splitter/batch_slice.cpp
@@ -49,16 +49,21 @@ bool TGeneralSerializedSlice::GroupBlobs(std::vector<TSplittedBlob>& blobs) {
Y_ABORT_UNLESS((i64)chunksInProgress[i]->GetPackedSize() > Settings.GetMinBlobSize() - partSize);
Y_ABORT_UNLESS(otherSize - (Settings.GetMinBlobSize() - partSize) >= Settings.GetMinBlobSize());
- Counters->BySizeSplitter.OnTrashSerialized(chunksInProgress[i]->GetPackedSize());
- const std::vector<ui64> sizes = {(ui64)(Settings.GetMinBlobSize() - partSize)};
- std::vector<IPortionColumnChunk::TPtr> newChunks = chunksInProgress[i]->InternalSplit(Schema->GetColumnSaver(chunksInProgress[i]->GetColumnId()), Counters, sizes);
- chunksInProgress.erase(chunksInProgress.begin() + i);
- chunksInProgress.insert(chunksInProgress.begin() + i, newChunks.begin(), newChunks.end());
+ std::vector<IPortionColumnChunk::TPtr> newChunks;
+ const bool splittable = chunksInProgress[i]->GetRecordsCount() > 1;
+ if (splittable) {
+ Counters->BySizeSplitter.OnTrashSerialized(chunksInProgress[i]->GetPackedSize());
+ const std::vector<ui64> sizes = {(ui64)(Settings.GetMinBlobSize() - partSize)};
+ newChunks = chunksInProgress[i]->InternalSplit(Schema->GetColumnSaver(chunksInProgress[i]->GetColumnId()), Counters, sizes);
+ chunksInProgress.erase(chunksInProgress.begin() + i);
+ chunksInProgress.insert(chunksInProgress.begin() + i, newChunks.begin(), newChunks.end());
+ }
TSplittedBlob newBlob;
for (ui32 chunk = 0; chunk <= i; ++chunk) {
newBlob.Take(chunksInProgress[chunk]);
}
+ AFL_VERIFY(splittable || newBlob.GetSize() < Settings.GetMaxBlobSize())("splittable", splittable)("blob_size", newBlob.GetSize())("max", Settings.GetMaxBlobSize());
if (newBlob.GetSize() < Settings.GetMaxBlobSize()) {
chunksInProgress.erase(chunksInProgress.begin(), chunksInProgress.begin() + i + 1);
result.emplace_back(std::move(newBlob));
diff --git a/ydb/core/tx/columnshard/splitter/simple.cpp b/ydb/core/tx/columnshard/splitter/simple.cpp
index a8343c8d6e5..f6691138703 100644
--- a/ydb/core/tx/columnshard/splitter/simple.cpp
+++ b/ydb/core/tx/columnshard/splitter/simple.cpp
@@ -1,6 +1,7 @@
#include "simple.h"
#include <ydb/core/formats/arrow/arrow_helpers.h>
#include <ydb/core/formats/arrow/common/validation.h>
+#include <ydb/core/formats/arrow/size_calcer.h>
#include <util/string/join.h>
namespace NKikimr::NOlap {
@@ -24,41 +25,138 @@ std::vector<TSaverSplittedChunk> TSimpleSplitter::Split(const std::shared_ptr<ar
return Split(batch, maxBlobSize);
}
-std::vector<TSaverSplittedChunk> TSimpleSplitter::Split(const std::shared_ptr<arrow::RecordBatch>& data, const ui32 maxBlobSize) const {
- Y_ABORT_UNLESS(data->num_columns() == 1);
- ui64 splitFactor = Stats ? Stats->PredictOptimalSplitFactor(data->num_rows(), maxBlobSize).value_or(1) : 1;
- while (true) {
- Y_ABORT_UNLESS(splitFactor < 100);
- std::vector<TSaverSplittedChunk> result;
- result.reserve(splitFactor);
- bool isCorrect = true;
- ui64 serializedDataBytes = 0;
- if (splitFactor == 1) {
- TString blob = ColumnSaver.Apply(data);
- isCorrect = blob.size() < maxBlobSize;
- serializedDataBytes += blob.size();
- result.emplace_back(TSaverSplittedChunk(data, std::move(blob)));
- } else {
- TLinearSplitInfo linearSplitting = TSimpleSplitter::GetLinearSplittingByMax(data->num_rows(), data->num_rows() / splitFactor);
- for (auto it = linearSplitting.StartIterator(); it.IsValid(); it.Next()) {
- auto slice = data->Slice(it.GetPosition(), it.GetCurrentPackSize());
- result.emplace_back(slice, ColumnSaver.Apply(slice));
- serializedDataBytes += result.back().GetSerializedChunk().size();
- if (result.back().GetSerializedChunk().size() >= maxBlobSize) {
- isCorrect = false;
- Y_ABORT_UNLESS(!linearSplitting.IsMinimalGranularity());
- break;
+class TSplitChunk {
+private:
+ std::shared_ptr<arrow::RecordBatch> Data;
+ YDB_READONLY_DEF(std::optional<TSaverSplittedChunk>, Result);
+ ui32 SplitFactor = 0;
+ ui32 Iterations = 0;
+ ui32 MaxBlobSize = 8 * 1024 * 1024;
+ TColumnSaver ColumnSaver;
+ std::shared_ptr<NColumnShard::TSplitterCounters> Counters;
+public:
+ TSplitChunk(const ui32 baseSplitFactor, const ui32 maxBlobSize, const std::shared_ptr<arrow::RecordBatch>& data, const TColumnSaver& columnSaver, const std::shared_ptr<NColumnShard::TSplitterCounters>& counters)
+ : Data(data)
+ , SplitFactor(baseSplitFactor)
+ , MaxBlobSize(maxBlobSize)
+ , ColumnSaver(columnSaver)
+ , Counters(counters)
+ {
+ AFL_VERIFY(Data && Data->num_rows());
+ AFL_VERIFY(SplitFactor);
+ }
+
+ TSplitChunk(const ui32 baseSplitFactor, const ui32 maxBlobSize, const std::shared_ptr<arrow::RecordBatch>& data, TString&& serializedData, const TColumnSaver& columnSaver, const std::shared_ptr<NColumnShard::TSplitterCounters>& counters)
+ : Data(data)
+ , Result(TSaverSplittedChunk(data, std::move(serializedData)))
+ , SplitFactor(baseSplitFactor)
+ , MaxBlobSize(maxBlobSize)
+ , ColumnSaver(columnSaver)
+ , Counters(counters)
+ {
+ AFL_VERIFY(Data && Data->num_rows());
+ AFL_VERIFY(SplitFactor);
+ }
+
+ std::vector<TSplitChunk> Split() {
+ while (true) {
+ AFL_VERIFY(!Result);
+ AFL_VERIFY(++Iterations < 100);
+ AFL_VERIFY(SplitFactor <= Data->num_rows())("factor", SplitFactor)("records", Data->num_rows())("iteration", Iterations)("size", NArrow::GetBatchDataSize(Data));
+ bool found = false;
+ std::vector<TSplitChunk> result;
+ if (SplitFactor == 1) {
+ TString blob = ColumnSaver.Apply(Data);
+ if (blob.size() < MaxBlobSize) {
+ Counters->SimpleSplitter.OnCorrectSerialized(blob.size());
+ Result = TSaverSplittedChunk(Data, std::move(blob));
+ found = true;
+ result.emplace_back(*this);
+ } else {
+ Counters->SimpleSplitter.OnTrashSerialized(blob.size());
+ TSimpleSerializationStat stats(blob.size(), Data->num_rows(), NArrow::GetBatchDataSize(Data));
+ SplitFactor = stats.PredictOptimalSplitFactor(Data->num_rows(), MaxBlobSize).value_or(1);
+ if (SplitFactor == 1) {
+ SplitFactor = 2;
+ }
+ AFL_VERIFY(Data->num_rows() > 1);
+ }
+ } else {
+ TLinearSplitInfo linearSplitting = TSimpleSplitter::GetLinearSplittingByMax(Data->num_rows(), Data->num_rows() / SplitFactor);
+ TStringBuilder sb;
+ std::optional<ui32> badStartPosition;
+ ui32 badBatchRecordsCount = 0;
+ ui64 badBatchSerializedSize = 0;
+ ui32 badBatchCount = 0;
+ for (auto it = linearSplitting.StartIterator(); it.IsValid(); it.Next()) {
+ auto slice = Data->Slice(it.GetPosition(), it.GetCurrentPackSize());
+ TString blob = ColumnSaver.Apply(slice);
+ if (blob.size() >= MaxBlobSize) {
+ Counters->SimpleSplitter.OnTrashSerialized(blob.size());
+ if (!badStartPosition) {
+ badStartPosition = it.GetPosition();
+ }
+ badBatchSerializedSize += blob.size();
+ badBatchRecordsCount += it.GetCurrentPackSize();
+ ++badBatchCount;
+ Y_ABORT_UNLESS(!linearSplitting.IsMinimalGranularity());
+ } else {
+ Counters->SimpleSplitter.OnCorrectSerialized(blob.size());
+ if (badStartPosition) {
+ AFL_VERIFY(badBatchRecordsCount && badBatchCount)("count", badBatchCount)("records", badBatchRecordsCount);
+ auto badSlice = Data->Slice(*badStartPosition, badBatchRecordsCount);
+ TSimpleSerializationStat stats(badBatchSerializedSize, badBatchRecordsCount, Max<ui32>());
+ result.emplace_back(std::max<ui32>(stats.PredictOptimalSplitFactor(badBatchRecordsCount, MaxBlobSize).value_or(1), badBatchCount) + 1, MaxBlobSize, badSlice, ColumnSaver, Counters);
+ badStartPosition = {};
+ badBatchRecordsCount = 0;
+ badBatchCount = 0;
+ badBatchSerializedSize = 0;
+ }
+ found = true;
+ result.emplace_back(1, MaxBlobSize, slice, std::move(blob), ColumnSaver, Counters);
+ }
+ }
+ if (badStartPosition) {
+ auto badSlice = Data->Slice(*badStartPosition, badBatchRecordsCount);
+ TSimpleSerializationStat stats(badBatchSerializedSize, badBatchRecordsCount, Max<ui32>());
+ result.emplace_back(std::max<ui32>(stats.PredictOptimalSplitFactor(badBatchRecordsCount, MaxBlobSize).value_or(1), badBatchCount) + 1, MaxBlobSize, badSlice, ColumnSaver, Counters);
}
+ ++SplitFactor;
}
+ if (found) {
+ return result;
+ }
+ }
+ AFL_VERIFY(false);
+ return {};
+ }
+};
+
+std::vector<TSaverSplittedChunk> TSimpleSplitter::Split(const std::shared_ptr<arrow::RecordBatch>& data, const ui32 maxBlobSize) const {
+ AFL_VERIFY(data->num_columns() == 1);
+ AFL_VERIFY(data->num_rows());
+ TSplitChunk baseChunk(Stats ? Stats->PredictOptimalSplitFactor(data->num_rows(), maxBlobSize).value_or(1) : 1, maxBlobSize, data, ColumnSaver, Counters);
+ std::vector<TSplitChunk> chunks = {baseChunk};
+ for (auto it = chunks.begin(); it != chunks.end(); ) {
+ AFL_VERIFY(chunks.size() < 100);
+ if (!!it->GetResult()) {
+ ++it;
+ continue;
}
- if (isCorrect) {
- Counters->SimpleSplitter.OnCorrectSerialized(serializedDataBytes);
- return result;
+ std::vector<TSplitChunk> splitted = it->Split();
+ if (splitted.size() == 1) {
+ *it = splitted.front();
} else {
- Counters->SimpleSplitter.OnTrashSerialized(serializedDataBytes);
+ it = chunks.insert(it, splitted.begin(), splitted.end());
+ chunks.erase(it + splitted.size());
}
- ++splitFactor;
}
+ std::vector<TSaverSplittedChunk> result;
+ for (auto&& i : chunks) {
+ AFL_VERIFY(i.GetResult());
+ result.emplace_back(*i.GetResult());
+ }
+ return result;
}
std::vector<TSaverSplittedChunk> TSimpleSplitter::SplitByRecordsCount(std::shared_ptr<arrow::RecordBatch> data, const std::vector<ui64>& recordsCount) const {
diff --git a/ydb/core/tx/columnshard/splitter/simple.h b/ydb/core/tx/columnshard/splitter/simple.h
index 773fc805612..2366cdd4416 100644
--- a/ydb/core/tx/columnshard/splitter/simple.h
+++ b/ydb/core/tx/columnshard/splitter/simple.h
@@ -63,9 +63,9 @@ public:
, PackSize(packSize)
, ObjectsCount(objectsCount)
{
- Y_ABORT_UNLESS(objectsCount >= packsCount);
- Y_ABORT_UNLESS(PackSize);
- Y_ABORT_UNLESS(PacksCount);
+ AFL_VERIFY(objectsCount >= packsCount)("objects_count", objectsCount)("packs_count", packsCount);
+ AFL_VERIFY(PackSize);
+ AFL_VERIFY(PacksCount);
}
class TIterator {