diff options
author | ivanmorozov333 <111685085+ivanmorozov333@users.noreply.github.com> | 2024-01-03 11:37:33 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-01-03 11:37:33 +0300 |
commit | befcdc2455c131b2184d45f8547c8123fc0390f3 (patch) | |
tree | 9bbf5bca174eeca8df3026006a06112f66a55275 | |
parent | 080882eaae1f6d4870b648699b6c7600ebecb36c (diff) | |
download | ydb-befcdc2455c131b2184d45f8547c8123fc0390f3.tar.gz |
KIKIMR-20380: min max portion keys validation (#822)
* validation
* corrections
* clean configuration for splitting
14 files changed, 52 insertions, 41 deletions
diff --git a/ydb/core/formats/arrow/special_keys.cpp b/ydb/core/formats/arrow/special_keys.cpp index 83908f38ad1..0006b339464 100644 --- a/ydb/core/formats/arrow/special_keys.cpp +++ b/ydb/core/formats/arrow/special_keys.cpp @@ -15,7 +15,7 @@ bool TSpecialKeys::DeserializeFromString(const TString& data) { return !!Data; } -std::optional<NKikimr::NArrow::TReplaceKey> TSpecialKeys::GetKeyByIndex(const ui32 position, const std::shared_ptr<arrow::Schema>& schema) const { +NKikimr::NArrow::TReplaceKey TSpecialKeys::GetKeyByIndex(const ui32 position, const std::shared_ptr<arrow::Schema>& schema) const { Y_ABORT_UNLESS(position < Data->num_rows()); if (schema) { return NArrow::TReplaceKey::FromBatch(Data, schema, position); diff --git a/ydb/core/formats/arrow/special_keys.h b/ydb/core/formats/arrow/special_keys.h index e777c5511c2..f157ce089fe 100644 --- a/ydb/core/formats/arrow/special_keys.h +++ b/ydb/core/formats/arrow/special_keys.h @@ -10,7 +10,7 @@ protected: bool DeserializeFromString(const TString& data); - std::optional<TReplaceKey> GetKeyByIndex(const ui32 position, const std::shared_ptr<arrow::Schema>& schema) const; + TReplaceKey GetKeyByIndex(const ui32 position, const std::shared_ptr<arrow::Schema>& schema) const; TSpecialKeys() = default; TSpecialKeys(std::shared_ptr<arrow::RecordBatch> data) @@ -45,10 +45,10 @@ public: std::shared_ptr<TFirstLastSpecialKeys> BuildAccordingToSchemaVerified(const std::shared_ptr<arrow::Schema>& schema) const; - std::optional<TReplaceKey> GetFirst(const std::shared_ptr<arrow::Schema>& schema = nullptr) const { + TReplaceKey GetFirst(const std::shared_ptr<arrow::Schema>& schema = nullptr) const { return GetKeyByIndex(0, schema); } - std::optional<TReplaceKey> GetLast(const std::shared_ptr<arrow::Schema>& schema = nullptr) const { + TReplaceKey GetLast(const std::shared_ptr<arrow::Schema>& schema = nullptr) const { return GetKeyByIndex(Data->num_rows() - 1, schema); } @@ -75,10 +75,10 @@ public: return Data; } - std::optional<TReplaceKey> GetMin(const std::shared_ptr<arrow::Schema>& schema = nullptr) const { + TReplaceKey GetMin(const std::shared_ptr<arrow::Schema>& schema = nullptr) const { return GetKeyByIndex(0, schema); } - std::optional<TReplaceKey> GetMax(const std::shared_ptr<arrow::Schema>& schema = nullptr) const { + TReplaceKey GetMax(const std::shared_ptr<arrow::Schema>& schema = nullptr) const { return GetKeyByIndex(Data->num_rows() - 1, schema); } diff --git a/ydb/core/tx/columnshard/engines/changes/compaction.cpp b/ydb/core/tx/columnshard/engines/changes/compaction.cpp index bb1f3fadc3c..6ad04f33711 100644 --- a/ydb/core/tx/columnshard/engines/changes/compaction.cpp +++ b/ydb/core/tx/columnshard/engines/changes/compaction.cpp @@ -31,13 +31,6 @@ bool TCompactColumnEngineChanges::DoApplyChanges(TColumnEngineForLogs& self, TAp return TBase::DoApplyChanges(self, context); } -ui32 TCompactColumnEngineChanges::NumSplitInto(const ui32 srcRows) const { - Y_ABORT_UNLESS(srcRows > 1); - const ui64 totalBytes = TotalBlobsSize(); - const ui32 numSplitInto = (totalBytes / Limits.GranuleSizeForOverloadPrevent) + 1; - return std::max<ui32>(2, numSplitInto); -} - void TCompactColumnEngineChanges::DoWriteIndex(NColumnShard::TColumnShard& self, TWriteIndexContext& context) { TBase::DoWriteIndex(self, context); } @@ -75,11 +68,9 @@ void TCompactColumnEngineChanges::DoOnFinish(NColumnShard::TColumnShard& self, T NeedGranuleStatusProvide = false; } -TCompactColumnEngineChanges::TCompactColumnEngineChanges(const TCompactionLimits& limits, std::shared_ptr<TGranuleMeta> granule, const std::vector<std::shared_ptr<TPortionInfo>>& portions, const TSaverContext& saverContext) - : TBase(limits.GetSplitSettings(), saverContext, StaticTypeName()) - , Limits(limits) - , GranuleMeta(granule) -{ +TCompactColumnEngineChanges::TCompactColumnEngineChanges(const TSplitSettings& splitSettings, std::shared_ptr<TGranuleMeta> granule, const std::vector<std::shared_ptr<TPortionInfo>>& portions, const TSaverContext& saverContext) + : TBase(splitSettings, saverContext, StaticTypeName()) + , GranuleMeta(granule) { Y_ABORT_UNLESS(GranuleMeta); SwitchedPortions.reserve(portions.size()); diff --git a/ydb/core/tx/columnshard/engines/changes/compaction.h b/ydb/core/tx/columnshard/engines/changes/compaction.h index e6539f940e5..7c033fdfbc0 100644 --- a/ydb/core/tx/columnshard/engines/changes/compaction.h +++ b/ydb/core/tx/columnshard/engines/changes/compaction.h @@ -12,7 +12,6 @@ private: using TBase = TChangesWithAppend; bool NeedGranuleStatusProvide = false; protected: - const TCompactionLimits Limits; std::shared_ptr<TGranuleMeta> GranuleMeta; virtual void DoStart(NColumnShard::TColumnShard& self) override; @@ -31,14 +30,12 @@ public: virtual THashSet<TPortionAddress> GetTouchedPortions() const override; - TCompactColumnEngineChanges(const TCompactionLimits& limits, std::shared_ptr<TGranuleMeta> granule, const std::vector<std::shared_ptr<TPortionInfo>>& portions, const TSaverContext& saverContext); + TCompactColumnEngineChanges(const TSplitSettings& splitSettings, std::shared_ptr<TGranuleMeta> granule, const std::vector<std::shared_ptr<TPortionInfo>>& portions, const TSaverContext& saverContext); ~TCompactColumnEngineChanges(); static TString StaticTypeName() { return "CS::GENERAL"; } - - ui32 NumSplitInto(const ui32 srcRows) const; }; } diff --git a/ydb/core/tx/columnshard/engines/changes/general_compaction.cpp b/ydb/core/tx/columnshard/engines/changes/general_compaction.cpp index ee366caf0f2..6c3c9caed41 100644 --- a/ydb/core/tx/columnshard/engines/changes/general_compaction.cpp +++ b/ydb/core/tx/columnshard/engines/changes/general_compaction.cpp @@ -1,17 +1,18 @@ + #include "general_compaction.h" -#include "compaction/column_portion_chunk.h" + #include "compaction/column_cursor.h" +#include "compaction/column_portion_chunk.h" #include "compaction/merge_context.h" #include "compaction/merged_column.h" #include "counters/general.h" +#include <ydb/core/formats/arrow/simple_builder/array.h> +#include <ydb/core/formats/arrow/simple_builder/filler.h> #include <ydb/core/tx/columnshard/columnshard_impl.h> #include <ydb/core/tx/columnshard/engines/portions/with_blobs.h> #include <ydb/core/tx/columnshard/splitter/batch_slice.h> #include <ydb/core/tx/columnshard/splitter/rb_splitter.h> -#include <ydb/core/formats/arrow/simple_builder/array.h> -#include <ydb/core/formats/arrow/simple_builder/filler.h> -#include "../reader/read_filter_merger.h" namespace NKikimr::NOlap::NCompaction { @@ -111,8 +112,8 @@ TConclusionStatus TGeneralCompactColumnEngineChanges::DoConstructBlobs(TConstruc std::map<std::string, std::vector<TColumnPortionResult>> columnChunks; ui32 batchIdx = 0; for (auto&& batchResult : batchResults) { - const ui32 portionRecordsCountLimit = batchResult->num_rows() / (batchResult->num_rows() / 10000 + 1) + 1; - TColumnMergeContext context(columnId, resultSchema, portionRecordsCountLimit, 50 * 1024 * 1024, columnInfo, SaverContext); + const ui32 portionRecordsCountLimit = batchResult->num_rows() / (batchResult->num_rows() / GetSplitSettings().GetExpectedRecordsCountOnPage() + 1) + 1; + TColumnMergeContext context(columnId, resultSchema, portionRecordsCountLimit, GetSplitSettings().GetExpectedUnpackColumnChunkRawSize(), columnInfo, SaverContext); TMergedColumn mColumn(context); auto columnPortionIdx = batchResult->GetColumnByName(portionIdFieldName); @@ -176,8 +177,7 @@ TConclusionStatus TGeneralCompactColumnEngineChanges::DoConstructBlobs(TConstruc } batchSlices.emplace_back(portionColumns, schemaDetails, context.Counters.SplitterCounters, GetSplitSettings()); } - - TSimilarSlicer slicer(4 * 1024 * 1024); + TSimilarSlicer slicer(GetSplitSettings().GetExpectedPortionSize()); auto packs = slicer.Split(batchSlices); ui32 recordIdx = 0; diff --git a/ydb/core/tx/columnshard/engines/changes/with_appended.h b/ydb/core/tx/columnshard/engines/changes/with_appended.h index bfc9ec09b4a..a398c61b005 100644 --- a/ydb/core/tx/columnshard/engines/changes/with_appended.h +++ b/ydb/core/tx/columnshard/engines/changes/with_appended.h @@ -9,8 +9,9 @@ namespace NKikimr::NOlap { class TChangesWithAppend: public TColumnEngineChanges { private: using TBase = TColumnEngineChanges; - TSplitSettings SplitSettings; + protected: + TSplitSettings SplitSettings; TSaverContext SaverContext; virtual void DoDebugString(TStringOutput& out) const override; virtual void DoCompile(TFinalizationContext& context) override; diff --git a/ydb/core/tx/columnshard/engines/portions/meta.cpp b/ydb/core/tx/columnshard/engines/portions/meta.cpp index d6d7430c068..9f6a2c30823 100644 --- a/ydb/core/tx/columnshard/engines/portions/meta.cpp +++ b/ydb/core/tx/columnshard/engines/portions/meta.cpp @@ -1,6 +1,8 @@ #include "meta.h" -#include <ydb/core/tx/columnshard/engines/scheme/index_info.h> + #include <ydb/core/formats/arrow/arrow_filter.h> +#include <ydb/core/tx/columnshard/engines/scheme/index_info.h> + #include <ydb/library/actors/core/log.h> namespace NKikimr::NOlap { @@ -10,6 +12,9 @@ void TPortionMeta::FillBatchInfo(const NArrow::TFirstLastSpecialKeys& primaryKey ReplaceKeyEdges = primaryKeys.BuildAccordingToSchemaVerified(indexInfo.GetReplaceKey()); IndexKeyStart = ReplaceKeyEdges->GetFirst(); IndexKeyEnd = ReplaceKeyEdges->GetLast(); + AFL_VERIFY(IndexKeyStart); + AFL_VERIFY(IndexKeyEnd); + AFL_VERIFY(*IndexKeyStart <= *IndexKeyEnd)("start", IndexKeyStart->DebugString())("end", IndexKeyEnd->DebugString()); } { @@ -49,6 +54,9 @@ bool TPortionMeta::DeserializeFromProto(const NKikimrTxColumnShard::TIndexPortio ReplaceKeyEdges = std::make_shared<NArrow::TFirstLastSpecialKeys>(portionMeta.GetPrimaryKeyBorders(), indexInfo.GetReplaceKey()); IndexKeyStart = ReplaceKeyEdges->GetFirst(); IndexKeyEnd = ReplaceKeyEdges->GetLast(); + AFL_VERIFY(IndexKeyStart); + AFL_VERIFY(IndexKeyEnd); + AFL_VERIFY (*IndexKeyStart <= *IndexKeyEnd)("start", IndexKeyStart->DebugString())("end", IndexKeyEnd->DebugString()); } if (portionMeta.HasRecordSnapshotMin()) { diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/intervals/blob_size.cpp b/ydb/core/tx/columnshard/engines/storage/optimizer/intervals/blob_size.cpp index ea255ea852b..da20b642005 100644 --- a/ydb/core/tx/columnshard/engines/storage/optimizer/intervals/blob_size.cpp +++ b/ydb/core/tx/columnshard/engines/storage/optimizer/intervals/blob_size.cpp @@ -30,7 +30,7 @@ std::shared_ptr<NKikimr::NOlap::TColumnEngineChanges> TBlobsWithSizeLimit::Build if (currentSum > SizeLimitToMerge || PortionsCount > CountLimitToMerge) { AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "take_granule_with_small")("portions", portions.size())("current_sum", currentSum); TSaverContext saverContext(StoragesManager->GetOperator(tierName.value_or(IStoragesManager::DefaultStorageId)), StoragesManager); - return std::make_shared<NCompaction::TGeneralCompactColumnEngineChanges>(limits, granule, portions, saverContext); + return std::make_shared<NCompaction::TGeneralCompactColumnEngineChanges>(limits.GetSplitSettings(), granule, portions, saverContext); } else { AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "take_granule_with_small")("skip", "not_enough_data"); } diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/intervals/optimizer.cpp b/ydb/core/tx/columnshard/engines/storage/optimizer/intervals/optimizer.cpp index e7dca265515..a31ebe4eb6a 100644 --- a/ydb/core/tx/columnshard/engines/storage/optimizer/intervals/optimizer.cpp +++ b/ydb/core/tx/columnshard/engines/storage/optimizer/intervals/optimizer.cpp @@ -65,7 +65,7 @@ std::shared_ptr<TColumnEngineChanges> TIntervalsOptimizerPlanner::DoGetOptimizat AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "take_granule")("features", features.DebugJson().GetStringRobust())("count", features.GetPortionsCount()); TSaverContext saverContext(StoragesManager->GetOperator(tierName.value_or(IStoragesManager::DefaultStorageId)), StoragesManager); - return std::make_shared<NCompaction::TGeneralCompactColumnEngineChanges>(limits, granule, portions, saverContext); + return std::make_shared<NCompaction::TGeneralCompactColumnEngineChanges>(limits.GetSplitSettings(), granule, portions, saverContext); } void TIntervalsOptimizerPlanner::RemovePortion(const std::shared_ptr<TPortionInfo>& info) { diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/lbuckets/optimizer.h b/ydb/core/tx/columnshard/engines/storage/optimizer/lbuckets/optimizer.h index c286c92ef24..5aba79e5a1b 100644 --- a/ydb/core/tx/columnshard/engines/storage/optimizer/lbuckets/optimizer.h +++ b/ydb/core/tx/columnshard/engines/storage/optimizer/lbuckets/optimizer.h @@ -747,7 +747,7 @@ public: AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("stop_instant", stopInstant.value_or(TInstant::Zero()))("size", size)("next", NextBorder ? NextBorder->DebugString() : "") ("count", portions.size())("info", Others.DebugString())("event", "start_optimization")("stop_point", stopPoint ? stopPoint->DebugString() : ""); TSaverContext saverContext(storagesManager->GetOperator(IStoragesManager::DefaultStorageId), storagesManager); - auto result = std::make_shared<NCompaction::TGeneralCompactColumnEngineChanges>(limits, granule, portions, saverContext); + auto result = std::make_shared<NCompaction::TGeneralCompactColumnEngineChanges>(limits.GetSplitSettings(), granule, portions, saverContext); if (MainPortion) { NIndexedReader::TSortableBatchPosition pos(MainPortion->IndexKeyStart().ToBatch(primaryKeysSchema), 0, primaryKeysSchema->field_names(), {}, false); result->AddCheckPoint(pos, true, false); diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/levels/optimizer.h b/ydb/core/tx/columnshard/engines/storage/optimizer/levels/optimizer.h index c670cd67716..f192ae5bb67 100644 --- a/ydb/core/tx/columnshard/engines/storage/optimizer/levels/optimizer.h +++ b/ydb/core/tx/columnshard/engines/storage/optimizer/levels/optimizer.h @@ -450,7 +450,7 @@ public: positions.emplace_back(*position); } TSaverContext saverContext(StoragesManager->GetOperator(IStoragesManager::DefaultStorageId), StoragesManager); - auto result = std::make_shared<NCompaction::TGeneralCompactColumnEngineChanges>(CompactionLimits, granule, portions, saverContext); + auto result = std::make_shared<NCompaction::TGeneralCompactColumnEngineChanges>(CompactionLimits.GetSplitSettings(), granule, portions, saverContext); for (auto&& i : positions) { result->AddCheckPoint(i); } diff --git a/ydb/core/tx/columnshard/normalizer/portion/normalizer.cpp b/ydb/core/tx/columnshard/normalizer/portion/normalizer.cpp index 0d02541c9a4..2102bd3260f 100644 --- a/ydb/core/tx/columnshard/normalizer/portion/normalizer.cpp +++ b/ydb/core/tx/columnshard/normalizer/portion/normalizer.cpp @@ -1,4 +1,5 @@ #include "normalizer.h" namespace NKikimr::NOlap { + } diff --git a/ydb/core/tx/columnshard/splitter/settings.h b/ydb/core/tx/columnshard/splitter/settings.h index b36caba4371..34bc2978379 100644 --- a/ydb/core/tx/columnshard/splitter/settings.h +++ b/ydb/core/tx/columnshard/splitter/settings.h @@ -1,5 +1,7 @@ #pragma once + #include <ydb/library/accessor/accessor.h> + #include <util/system/types.h> namespace NKikimr::NOlap { @@ -9,11 +11,22 @@ private: static const inline i64 DefaultMaxBlobSize = 8 * 1024 * 1024; static const inline i64 DefaultMinBlobSize = 4 * 1024 * 1024; static const inline i64 DefaultMinRecordsCount = 10000; - static const inline i64 DefaultMaxPortionSize = 4 * DefaultMaxBlobSize; + static const inline i64 DefaultMaxPortionSize = 6 * DefaultMaxBlobSize; YDB_ACCESSOR(i64, MaxBlobSize, DefaultMaxBlobSize); YDB_ACCESSOR(i64, MinBlobSize, DefaultMinBlobSize); YDB_ACCESSOR(i64, MinRecordsCount, DefaultMinRecordsCount); YDB_ACCESSOR(i64, MaxPortionSize, DefaultMaxPortionSize); public: + ui64 GetExpectedRecordsCountOnPage() const { + return 1.5 * MinRecordsCount; + } + + ui64 GetExpectedUnpackColumnChunkRawSize() const { + return (ui64)50 * 1024 * 1024; + } + + ui64 GetExpectedPortionSize() const { + return MaxPortionSize; + } }; } diff --git a/ydb/core/tx/columnshard/splitter/simple.h b/ydb/core/tx/columnshard/splitter/simple.h index 2366cdd4416..6746f4c098e 100644 --- a/ydb/core/tx/columnshard/splitter/simple.h +++ b/ydb/core/tx/columnshard/splitter/simple.h @@ -24,14 +24,12 @@ public: std::shared_ptr<arrow::Scalar> GetFirstScalar() const; std::shared_ptr<arrow::Scalar> GetLastScalar() const; - TSaverSplittedChunk(std::shared_ptr<arrow::RecordBatch> batch, TString&& serializedChunk) + TSaverSplittedChunk(const std::shared_ptr<arrow::RecordBatch>& batch, TString&& serializedChunk) : SlicedBatch(batch) - , SerializedChunk(std::move(serializedChunk)) - { + , SerializedChunk(std::move(serializedChunk)) { Y_ABORT_UNLESS(SlicedBatch); Y_ABORT_UNLESS(SlicedBatch->num_columns() == 1); Y_ABORT_UNLESS(SlicedBatch->num_rows()); - } bool IsCompatibleColumn(const std::shared_ptr<arrow::Field>& f) const { @@ -39,9 +37,11 @@ public: return false; } if (SlicedBatch->num_columns() != 1) { + AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("event", "unexpected columns count")("expectation", 1)("actual", SlicedBatch->num_columns()); return false; } if (!SlicedBatch->schema()->fields().front()->Equals(f)) { + AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("event", "unexpected column type")("expectation", f->ToString())("actual", SlicedBatch->schema()->fields().front()->ToString()); return false; } return true; |