aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov333 <111685085+ivanmorozov333@users.noreply.github.com>2024-01-03 11:37:33 +0300
committerGitHub <noreply@github.com>2024-01-03 11:37:33 +0300
commitbefcdc2455c131b2184d45f8547c8123fc0390f3 (patch)
tree9bbf5bca174eeca8df3026006a06112f66a55275
parent080882eaae1f6d4870b648699b6c7600ebecb36c (diff)
downloadydb-befcdc2455c131b2184d45f8547c8123fc0390f3.tar.gz
KIKIMR-20380: min max portion keys validation (#822)
* validation * corrections * clean configuration for splitting
-rw-r--r--ydb/core/formats/arrow/special_keys.cpp2
-rw-r--r--ydb/core/formats/arrow/special_keys.h10
-rw-r--r--ydb/core/tx/columnshard/engines/changes/compaction.cpp15
-rw-r--r--ydb/core/tx/columnshard/engines/changes/compaction.h5
-rw-r--r--ydb/core/tx/columnshard/engines/changes/general_compaction.cpp16
-rw-r--r--ydb/core/tx/columnshard/engines/changes/with_appended.h3
-rw-r--r--ydb/core/tx/columnshard/engines/portions/meta.cpp10
-rw-r--r--ydb/core/tx/columnshard/engines/storage/optimizer/intervals/blob_size.cpp2
-rw-r--r--ydb/core/tx/columnshard/engines/storage/optimizer/intervals/optimizer.cpp2
-rw-r--r--ydb/core/tx/columnshard/engines/storage/optimizer/lbuckets/optimizer.h2
-rw-r--r--ydb/core/tx/columnshard/engines/storage/optimizer/levels/optimizer.h2
-rw-r--r--ydb/core/tx/columnshard/normalizer/portion/normalizer.cpp1
-rw-r--r--ydb/core/tx/columnshard/splitter/settings.h15
-rw-r--r--ydb/core/tx/columnshard/splitter/simple.h8
14 files changed, 52 insertions, 41 deletions
diff --git a/ydb/core/formats/arrow/special_keys.cpp b/ydb/core/formats/arrow/special_keys.cpp
index 83908f38ad1..0006b339464 100644
--- a/ydb/core/formats/arrow/special_keys.cpp
+++ b/ydb/core/formats/arrow/special_keys.cpp
@@ -15,7 +15,7 @@ bool TSpecialKeys::DeserializeFromString(const TString& data) {
return !!Data;
}
-std::optional<NKikimr::NArrow::TReplaceKey> TSpecialKeys::GetKeyByIndex(const ui32 position, const std::shared_ptr<arrow::Schema>& schema) const {
+NKikimr::NArrow::TReplaceKey TSpecialKeys::GetKeyByIndex(const ui32 position, const std::shared_ptr<arrow::Schema>& schema) const {
Y_ABORT_UNLESS(position < Data->num_rows());
if (schema) {
return NArrow::TReplaceKey::FromBatch(Data, schema, position);
diff --git a/ydb/core/formats/arrow/special_keys.h b/ydb/core/formats/arrow/special_keys.h
index e777c5511c2..f157ce089fe 100644
--- a/ydb/core/formats/arrow/special_keys.h
+++ b/ydb/core/formats/arrow/special_keys.h
@@ -10,7 +10,7 @@ protected:
bool DeserializeFromString(const TString& data);
- std::optional<TReplaceKey> GetKeyByIndex(const ui32 position, const std::shared_ptr<arrow::Schema>& schema) const;
+ TReplaceKey GetKeyByIndex(const ui32 position, const std::shared_ptr<arrow::Schema>& schema) const;
TSpecialKeys() = default;
TSpecialKeys(std::shared_ptr<arrow::RecordBatch> data)
@@ -45,10 +45,10 @@ public:
std::shared_ptr<TFirstLastSpecialKeys> BuildAccordingToSchemaVerified(const std::shared_ptr<arrow::Schema>& schema) const;
- std::optional<TReplaceKey> GetFirst(const std::shared_ptr<arrow::Schema>& schema = nullptr) const {
+ TReplaceKey GetFirst(const std::shared_ptr<arrow::Schema>& schema = nullptr) const {
return GetKeyByIndex(0, schema);
}
- std::optional<TReplaceKey> GetLast(const std::shared_ptr<arrow::Schema>& schema = nullptr) const {
+ TReplaceKey GetLast(const std::shared_ptr<arrow::Schema>& schema = nullptr) const {
return GetKeyByIndex(Data->num_rows() - 1, schema);
}
@@ -75,10 +75,10 @@ public:
return Data;
}
- std::optional<TReplaceKey> GetMin(const std::shared_ptr<arrow::Schema>& schema = nullptr) const {
+ TReplaceKey GetMin(const std::shared_ptr<arrow::Schema>& schema = nullptr) const {
return GetKeyByIndex(0, schema);
}
- std::optional<TReplaceKey> GetMax(const std::shared_ptr<arrow::Schema>& schema = nullptr) const {
+ TReplaceKey GetMax(const std::shared_ptr<arrow::Schema>& schema = nullptr) const {
return GetKeyByIndex(Data->num_rows() - 1, schema);
}
diff --git a/ydb/core/tx/columnshard/engines/changes/compaction.cpp b/ydb/core/tx/columnshard/engines/changes/compaction.cpp
index bb1f3fadc3c..6ad04f33711 100644
--- a/ydb/core/tx/columnshard/engines/changes/compaction.cpp
+++ b/ydb/core/tx/columnshard/engines/changes/compaction.cpp
@@ -31,13 +31,6 @@ bool TCompactColumnEngineChanges::DoApplyChanges(TColumnEngineForLogs& self, TAp
return TBase::DoApplyChanges(self, context);
}
-ui32 TCompactColumnEngineChanges::NumSplitInto(const ui32 srcRows) const {
- Y_ABORT_UNLESS(srcRows > 1);
- const ui64 totalBytes = TotalBlobsSize();
- const ui32 numSplitInto = (totalBytes / Limits.GranuleSizeForOverloadPrevent) + 1;
- return std::max<ui32>(2, numSplitInto);
-}
-
void TCompactColumnEngineChanges::DoWriteIndex(NColumnShard::TColumnShard& self, TWriteIndexContext& context) {
TBase::DoWriteIndex(self, context);
}
@@ -75,11 +68,9 @@ void TCompactColumnEngineChanges::DoOnFinish(NColumnShard::TColumnShard& self, T
NeedGranuleStatusProvide = false;
}
-TCompactColumnEngineChanges::TCompactColumnEngineChanges(const TCompactionLimits& limits, std::shared_ptr<TGranuleMeta> granule, const std::vector<std::shared_ptr<TPortionInfo>>& portions, const TSaverContext& saverContext)
- : TBase(limits.GetSplitSettings(), saverContext, StaticTypeName())
- , Limits(limits)
- , GranuleMeta(granule)
-{
+TCompactColumnEngineChanges::TCompactColumnEngineChanges(const TSplitSettings& splitSettings, std::shared_ptr<TGranuleMeta> granule, const std::vector<std::shared_ptr<TPortionInfo>>& portions, const TSaverContext& saverContext)
+ : TBase(splitSettings, saverContext, StaticTypeName())
+ , GranuleMeta(granule) {
Y_ABORT_UNLESS(GranuleMeta);
SwitchedPortions.reserve(portions.size());
diff --git a/ydb/core/tx/columnshard/engines/changes/compaction.h b/ydb/core/tx/columnshard/engines/changes/compaction.h
index e6539f940e5..7c033fdfbc0 100644
--- a/ydb/core/tx/columnshard/engines/changes/compaction.h
+++ b/ydb/core/tx/columnshard/engines/changes/compaction.h
@@ -12,7 +12,6 @@ private:
using TBase = TChangesWithAppend;
bool NeedGranuleStatusProvide = false;
protected:
- const TCompactionLimits Limits;
std::shared_ptr<TGranuleMeta> GranuleMeta;
virtual void DoStart(NColumnShard::TColumnShard& self) override;
@@ -31,14 +30,12 @@ public:
virtual THashSet<TPortionAddress> GetTouchedPortions() const override;
- TCompactColumnEngineChanges(const TCompactionLimits& limits, std::shared_ptr<TGranuleMeta> granule, const std::vector<std::shared_ptr<TPortionInfo>>& portions, const TSaverContext& saverContext);
+ TCompactColumnEngineChanges(const TSplitSettings& splitSettings, std::shared_ptr<TGranuleMeta> granule, const std::vector<std::shared_ptr<TPortionInfo>>& portions, const TSaverContext& saverContext);
~TCompactColumnEngineChanges();
static TString StaticTypeName() {
return "CS::GENERAL";
}
-
- ui32 NumSplitInto(const ui32 srcRows) const;
};
}
diff --git a/ydb/core/tx/columnshard/engines/changes/general_compaction.cpp b/ydb/core/tx/columnshard/engines/changes/general_compaction.cpp
index ee366caf0f2..6c3c9caed41 100644
--- a/ydb/core/tx/columnshard/engines/changes/general_compaction.cpp
+++ b/ydb/core/tx/columnshard/engines/changes/general_compaction.cpp
@@ -1,17 +1,18 @@
+
#include "general_compaction.h"
-#include "compaction/column_portion_chunk.h"
+
#include "compaction/column_cursor.h"
+#include "compaction/column_portion_chunk.h"
#include "compaction/merge_context.h"
#include "compaction/merged_column.h"
#include "counters/general.h"
+#include <ydb/core/formats/arrow/simple_builder/array.h>
+#include <ydb/core/formats/arrow/simple_builder/filler.h>
#include <ydb/core/tx/columnshard/columnshard_impl.h>
#include <ydb/core/tx/columnshard/engines/portions/with_blobs.h>
#include <ydb/core/tx/columnshard/splitter/batch_slice.h>
#include <ydb/core/tx/columnshard/splitter/rb_splitter.h>
-#include <ydb/core/formats/arrow/simple_builder/array.h>
-#include <ydb/core/formats/arrow/simple_builder/filler.h>
-#include "../reader/read_filter_merger.h"
namespace NKikimr::NOlap::NCompaction {
@@ -111,8 +112,8 @@ TConclusionStatus TGeneralCompactColumnEngineChanges::DoConstructBlobs(TConstruc
std::map<std::string, std::vector<TColumnPortionResult>> columnChunks;
ui32 batchIdx = 0;
for (auto&& batchResult : batchResults) {
- const ui32 portionRecordsCountLimit = batchResult->num_rows() / (batchResult->num_rows() / 10000 + 1) + 1;
- TColumnMergeContext context(columnId, resultSchema, portionRecordsCountLimit, 50 * 1024 * 1024, columnInfo, SaverContext);
+ const ui32 portionRecordsCountLimit = batchResult->num_rows() / (batchResult->num_rows() / GetSplitSettings().GetExpectedRecordsCountOnPage() + 1) + 1;
+ TColumnMergeContext context(columnId, resultSchema, portionRecordsCountLimit, GetSplitSettings().GetExpectedUnpackColumnChunkRawSize(), columnInfo, SaverContext);
TMergedColumn mColumn(context);
auto columnPortionIdx = batchResult->GetColumnByName(portionIdFieldName);
@@ -176,8 +177,7 @@ TConclusionStatus TGeneralCompactColumnEngineChanges::DoConstructBlobs(TConstruc
}
batchSlices.emplace_back(portionColumns, schemaDetails, context.Counters.SplitterCounters, GetSplitSettings());
}
-
- TSimilarSlicer slicer(4 * 1024 * 1024);
+ TSimilarSlicer slicer(GetSplitSettings().GetExpectedPortionSize());
auto packs = slicer.Split(batchSlices);
ui32 recordIdx = 0;
diff --git a/ydb/core/tx/columnshard/engines/changes/with_appended.h b/ydb/core/tx/columnshard/engines/changes/with_appended.h
index bfc9ec09b4a..a398c61b005 100644
--- a/ydb/core/tx/columnshard/engines/changes/with_appended.h
+++ b/ydb/core/tx/columnshard/engines/changes/with_appended.h
@@ -9,8 +9,9 @@ namespace NKikimr::NOlap {
class TChangesWithAppend: public TColumnEngineChanges {
private:
using TBase = TColumnEngineChanges;
- TSplitSettings SplitSettings;
+
protected:
+ TSplitSettings SplitSettings;
TSaverContext SaverContext;
virtual void DoDebugString(TStringOutput& out) const override;
virtual void DoCompile(TFinalizationContext& context) override;
diff --git a/ydb/core/tx/columnshard/engines/portions/meta.cpp b/ydb/core/tx/columnshard/engines/portions/meta.cpp
index d6d7430c068..9f6a2c30823 100644
--- a/ydb/core/tx/columnshard/engines/portions/meta.cpp
+++ b/ydb/core/tx/columnshard/engines/portions/meta.cpp
@@ -1,6 +1,8 @@
#include "meta.h"
-#include <ydb/core/tx/columnshard/engines/scheme/index_info.h>
+
#include <ydb/core/formats/arrow/arrow_filter.h>
+#include <ydb/core/tx/columnshard/engines/scheme/index_info.h>
+
#include <ydb/library/actors/core/log.h>
namespace NKikimr::NOlap {
@@ -10,6 +12,9 @@ void TPortionMeta::FillBatchInfo(const NArrow::TFirstLastSpecialKeys& primaryKey
ReplaceKeyEdges = primaryKeys.BuildAccordingToSchemaVerified(indexInfo.GetReplaceKey());
IndexKeyStart = ReplaceKeyEdges->GetFirst();
IndexKeyEnd = ReplaceKeyEdges->GetLast();
+ AFL_VERIFY(IndexKeyStart);
+ AFL_VERIFY(IndexKeyEnd);
+ AFL_VERIFY(*IndexKeyStart <= *IndexKeyEnd)("start", IndexKeyStart->DebugString())("end", IndexKeyEnd->DebugString());
}
{
@@ -49,6 +54,9 @@ bool TPortionMeta::DeserializeFromProto(const NKikimrTxColumnShard::TIndexPortio
ReplaceKeyEdges = std::make_shared<NArrow::TFirstLastSpecialKeys>(portionMeta.GetPrimaryKeyBorders(), indexInfo.GetReplaceKey());
IndexKeyStart = ReplaceKeyEdges->GetFirst();
IndexKeyEnd = ReplaceKeyEdges->GetLast();
+ AFL_VERIFY(IndexKeyStart);
+ AFL_VERIFY(IndexKeyEnd);
+ AFL_VERIFY (*IndexKeyStart <= *IndexKeyEnd)("start", IndexKeyStart->DebugString())("end", IndexKeyEnd->DebugString());
}
if (portionMeta.HasRecordSnapshotMin()) {
diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/intervals/blob_size.cpp b/ydb/core/tx/columnshard/engines/storage/optimizer/intervals/blob_size.cpp
index ea255ea852b..da20b642005 100644
--- a/ydb/core/tx/columnshard/engines/storage/optimizer/intervals/blob_size.cpp
+++ b/ydb/core/tx/columnshard/engines/storage/optimizer/intervals/blob_size.cpp
@@ -30,7 +30,7 @@ std::shared_ptr<NKikimr::NOlap::TColumnEngineChanges> TBlobsWithSizeLimit::Build
if (currentSum > SizeLimitToMerge || PortionsCount > CountLimitToMerge) {
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "take_granule_with_small")("portions", portions.size())("current_sum", currentSum);
TSaverContext saverContext(StoragesManager->GetOperator(tierName.value_or(IStoragesManager::DefaultStorageId)), StoragesManager);
- return std::make_shared<NCompaction::TGeneralCompactColumnEngineChanges>(limits, granule, portions, saverContext);
+ return std::make_shared<NCompaction::TGeneralCompactColumnEngineChanges>(limits.GetSplitSettings(), granule, portions, saverContext);
} else {
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "take_granule_with_small")("skip", "not_enough_data");
}
diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/intervals/optimizer.cpp b/ydb/core/tx/columnshard/engines/storage/optimizer/intervals/optimizer.cpp
index e7dca265515..a31ebe4eb6a 100644
--- a/ydb/core/tx/columnshard/engines/storage/optimizer/intervals/optimizer.cpp
+++ b/ydb/core/tx/columnshard/engines/storage/optimizer/intervals/optimizer.cpp
@@ -65,7 +65,7 @@ std::shared_ptr<TColumnEngineChanges> TIntervalsOptimizerPlanner::DoGetOptimizat
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "take_granule")("features", features.DebugJson().GetStringRobust())("count", features.GetPortionsCount());
TSaverContext saverContext(StoragesManager->GetOperator(tierName.value_or(IStoragesManager::DefaultStorageId)), StoragesManager);
- return std::make_shared<NCompaction::TGeneralCompactColumnEngineChanges>(limits, granule, portions, saverContext);
+ return std::make_shared<NCompaction::TGeneralCompactColumnEngineChanges>(limits.GetSplitSettings(), granule, portions, saverContext);
}
void TIntervalsOptimizerPlanner::RemovePortion(const std::shared_ptr<TPortionInfo>& info) {
diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/lbuckets/optimizer.h b/ydb/core/tx/columnshard/engines/storage/optimizer/lbuckets/optimizer.h
index c286c92ef24..5aba79e5a1b 100644
--- a/ydb/core/tx/columnshard/engines/storage/optimizer/lbuckets/optimizer.h
+++ b/ydb/core/tx/columnshard/engines/storage/optimizer/lbuckets/optimizer.h
@@ -747,7 +747,7 @@ public:
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("stop_instant", stopInstant.value_or(TInstant::Zero()))("size", size)("next", NextBorder ? NextBorder->DebugString() : "")
("count", portions.size())("info", Others.DebugString())("event", "start_optimization")("stop_point", stopPoint ? stopPoint->DebugString() : "");
TSaverContext saverContext(storagesManager->GetOperator(IStoragesManager::DefaultStorageId), storagesManager);
- auto result = std::make_shared<NCompaction::TGeneralCompactColumnEngineChanges>(limits, granule, portions, saverContext);
+ auto result = std::make_shared<NCompaction::TGeneralCompactColumnEngineChanges>(limits.GetSplitSettings(), granule, portions, saverContext);
if (MainPortion) {
NIndexedReader::TSortableBatchPosition pos(MainPortion->IndexKeyStart().ToBatch(primaryKeysSchema), 0, primaryKeysSchema->field_names(), {}, false);
result->AddCheckPoint(pos, true, false);
diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/levels/optimizer.h b/ydb/core/tx/columnshard/engines/storage/optimizer/levels/optimizer.h
index c670cd67716..f192ae5bb67 100644
--- a/ydb/core/tx/columnshard/engines/storage/optimizer/levels/optimizer.h
+++ b/ydb/core/tx/columnshard/engines/storage/optimizer/levels/optimizer.h
@@ -450,7 +450,7 @@ public:
positions.emplace_back(*position);
}
TSaverContext saverContext(StoragesManager->GetOperator(IStoragesManager::DefaultStorageId), StoragesManager);
- auto result = std::make_shared<NCompaction::TGeneralCompactColumnEngineChanges>(CompactionLimits, granule, portions, saverContext);
+ auto result = std::make_shared<NCompaction::TGeneralCompactColumnEngineChanges>(CompactionLimits.GetSplitSettings(), granule, portions, saverContext);
for (auto&& i : positions) {
result->AddCheckPoint(i);
}
diff --git a/ydb/core/tx/columnshard/normalizer/portion/normalizer.cpp b/ydb/core/tx/columnshard/normalizer/portion/normalizer.cpp
index 0d02541c9a4..2102bd3260f 100644
--- a/ydb/core/tx/columnshard/normalizer/portion/normalizer.cpp
+++ b/ydb/core/tx/columnshard/normalizer/portion/normalizer.cpp
@@ -1,4 +1,5 @@
#include "normalizer.h"
namespace NKikimr::NOlap {
+
}
diff --git a/ydb/core/tx/columnshard/splitter/settings.h b/ydb/core/tx/columnshard/splitter/settings.h
index b36caba4371..34bc2978379 100644
--- a/ydb/core/tx/columnshard/splitter/settings.h
+++ b/ydb/core/tx/columnshard/splitter/settings.h
@@ -1,5 +1,7 @@
#pragma once
+
#include <ydb/library/accessor/accessor.h>
+
#include <util/system/types.h>
namespace NKikimr::NOlap {
@@ -9,11 +11,22 @@ private:
static const inline i64 DefaultMaxBlobSize = 8 * 1024 * 1024;
static const inline i64 DefaultMinBlobSize = 4 * 1024 * 1024;
static const inline i64 DefaultMinRecordsCount = 10000;
- static const inline i64 DefaultMaxPortionSize = 4 * DefaultMaxBlobSize;
+ static const inline i64 DefaultMaxPortionSize = 6 * DefaultMaxBlobSize;
YDB_ACCESSOR(i64, MaxBlobSize, DefaultMaxBlobSize);
YDB_ACCESSOR(i64, MinBlobSize, DefaultMinBlobSize);
YDB_ACCESSOR(i64, MinRecordsCount, DefaultMinRecordsCount);
YDB_ACCESSOR(i64, MaxPortionSize, DefaultMaxPortionSize);
public:
+ ui64 GetExpectedRecordsCountOnPage() const {
+ return 1.5 * MinRecordsCount;
+ }
+
+ ui64 GetExpectedUnpackColumnChunkRawSize() const {
+ return (ui64)50 * 1024 * 1024;
+ }
+
+ ui64 GetExpectedPortionSize() const {
+ return MaxPortionSize;
+ }
};
}
diff --git a/ydb/core/tx/columnshard/splitter/simple.h b/ydb/core/tx/columnshard/splitter/simple.h
index 2366cdd4416..6746f4c098e 100644
--- a/ydb/core/tx/columnshard/splitter/simple.h
+++ b/ydb/core/tx/columnshard/splitter/simple.h
@@ -24,14 +24,12 @@ public:
std::shared_ptr<arrow::Scalar> GetFirstScalar() const;
std::shared_ptr<arrow::Scalar> GetLastScalar() const;
- TSaverSplittedChunk(std::shared_ptr<arrow::RecordBatch> batch, TString&& serializedChunk)
+ TSaverSplittedChunk(const std::shared_ptr<arrow::RecordBatch>& batch, TString&& serializedChunk)
: SlicedBatch(batch)
- , SerializedChunk(std::move(serializedChunk))
- {
+ , SerializedChunk(std::move(serializedChunk)) {
Y_ABORT_UNLESS(SlicedBatch);
Y_ABORT_UNLESS(SlicedBatch->num_columns() == 1);
Y_ABORT_UNLESS(SlicedBatch->num_rows());
-
}
bool IsCompatibleColumn(const std::shared_ptr<arrow::Field>& f) const {
@@ -39,9 +37,11 @@ public:
return false;
}
if (SlicedBatch->num_columns() != 1) {
+ AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("event", "unexpected columns count")("expectation", 1)("actual", SlicedBatch->num_columns());
return false;
}
if (!SlicedBatch->schema()->fields().front()->Equals(f)) {
+ AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("event", "unexpected column type")("expectation", f->ToString())("actual", SlicedBatch->schema()->fields().front()->ToString());
return false;
}
return true;