diff options
author | ivanmorozov333 <ivanmorozov@ydb.tech> | 2025-05-19 19:24:28 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2025-05-19 19:24:28 +0300 |
commit | 0ccb7680e0d44713fec49f415247e65f56b9a74c (patch) | |
tree | c8b6fd4d3da0298fd19546981b8aa27530d68a9c | |
parent | ae7b1d937a8ca2fe77ce76b19eaf17fa9e267925 (diff) | |
download | ydb-0ccb7680e0d44713fec49f415247e65f56b9a74c.tar.gz |
snapshot interval (#18452)
Co-authored-by: ivanmorozov333 <imorozov333@ya.ru>
57 files changed, 1085 insertions, 382 deletions
diff --git a/ydb/core/formats/arrow/accessor/abstract/accessor.h b/ydb/core/formats/arrow/accessor/abstract/accessor.h index 692c3cd3c33..1c884adc9d3 100644 --- a/ydb/core/formats/arrow/accessor/abstract/accessor.h +++ b/ydb/core/formats/arrow/accessor/abstract/accessor.h @@ -367,7 +367,7 @@ public: for (ui32 currentIndex = 0; currentIndex < arr->GetRecordsCount();) { arrCurrent = arr->GetArray(arrCurrent, currentIndex, arr); auto result = actor(arrCurrent->GetArray()); - if (!!result) { + if (result) { return result; } currentIndex = currentIndex + arrCurrent->GetArray()->GetRecordsCount(); diff --git a/ydb/core/formats/arrow/accessor/sub_columns/accessor.cpp b/ydb/core/formats/arrow/accessor/sub_columns/accessor.cpp index f33a04785e6..71c541d93f4 100644 --- a/ydb/core/formats/arrow/accessor/sub_columns/accessor.cpp +++ b/ydb/core/formats/arrow/accessor/sub_columns/accessor.cpp @@ -17,9 +17,9 @@ namespace NKikimr::NArrow::NAccessor { TConclusion<std::shared_ptr<TSubColumnsArray>> TSubColumnsArray::Make( - const std::shared_ptr<IChunkedArray>& sourceArray, const NSubColumns::TSettings& settings) { + const std::shared_ptr<IChunkedArray>& sourceArray, const NSubColumns::TSettings& settings, const std::shared_ptr<arrow::DataType>& columnType) { AFL_VERIFY(sourceArray); - NSubColumns::TDataBuilder builder(sourceArray->GetDataType(), settings); + NSubColumns::TDataBuilder builder(columnType, settings); IChunkedArray::TReader reader(sourceArray); std::vector<std::shared_ptr<arrow::Array>> storage; for (ui32 i = 0; i < reader.GetRecordsCount();) { @@ -40,6 +40,7 @@ TSubColumnsArray::TSubColumnsArray(const std::shared_ptr<arrow::DataType>& type, , ColumnsData(NSubColumns::TColumnsData::BuildEmpty(recordsCount)) , OthersData(NSubColumns::TOthersData::BuildEmpty()) , Settings(settings) { + AFL_VERIFY(type->id() == arrow::binary()->id())("type", type->ToString())("error", "currently supported JsonDocument only"); } TSubColumnsArray::TSubColumnsArray(NSubColumns::TColumnsData&& columns, NSubColumns::TOthersData&& others, @@ -48,6 +49,7 @@ TSubColumnsArray::TSubColumnsArray(NSubColumns::TColumnsData&& columns, NSubColu , ColumnsData(std::move(columns)) , OthersData(std::move(others)) , Settings(settings) { + AFL_VERIFY(type->id() == arrow::binary()->id())("type", type->ToString())("error", "currently supported JsonDocument only"); } TString TSubColumnsArray::SerializeToString(const TChunkConstructionData& externalInfo) const { diff --git a/ydb/core/formats/arrow/accessor/sub_columns/accessor.h b/ydb/core/formats/arrow/accessor/sub_columns/accessor.h index 2fd9f60329f..6ff2dd1635a 100644 --- a/ydb/core/formats/arrow/accessor/sub_columns/accessor.h +++ b/ydb/core/formats/arrow/accessor/sub_columns/accessor.h @@ -101,8 +101,8 @@ public: TSubColumnsArray(NSubColumns::TColumnsData&& columns, NSubColumns::TOthersData&& others, const std::shared_ptr<arrow::DataType>& type, const ui32 recordsCount, const NSubColumns::TSettings& settings); - static TConclusion<std::shared_ptr<TSubColumnsArray>> Make( - const std::shared_ptr<IChunkedArray>& sourceArray, const NSubColumns::TSettings& settings); + static TConclusion<std::shared_ptr<TSubColumnsArray>> Make(const std::shared_ptr<IChunkedArray>& sourceArray, + const NSubColumns::TSettings& settings, const std::shared_ptr<arrow::DataType>& columnType); TSubColumnsArray(const std::shared_ptr<arrow::DataType>& type, const ui32 recordsCount, const NSubColumns::TSettings& settings); diff --git a/ydb/core/formats/arrow/accessor/sub_columns/constructor.cpp b/ydb/core/formats/arrow/accessor/sub_columns/constructor.cpp index 900049f303b..94b265cdae9 100644 --- a/ydb/core/formats/arrow/accessor/sub_columns/constructor.cpp +++ b/ydb/core/formats/arrow/accessor/sub_columns/constructor.cpp @@ -66,8 +66,8 @@ bool TConstructor::DoDeserializeFromProto(const NKikimrArrowAccessorProto::TCons } TConclusion<std::shared_ptr<IChunkedArray>> TConstructor::DoConstruct( - const std::shared_ptr<IChunkedArray>& originalData, const TChunkConstructionData& /*externalInfo*/) const { - auto conclusion = NAccessor::TSubColumnsArray::Make(originalData, Settings); + const std::shared_ptr<IChunkedArray>& originalData, const TChunkConstructionData& externalInfo) const { + auto conclusion = NAccessor::TSubColumnsArray::Make(originalData, Settings, externalInfo.GetColumnType()); if (conclusion.IsFail()) { return conclusion; } diff --git a/ydb/core/formats/arrow/accessor/sub_columns/ut/ut_sub_columns.cpp b/ydb/core/formats/arrow/accessor/sub_columns/ut/ut_sub_columns.cpp index edeef71ee95..17e5643695d 100644 --- a/ydb/core/formats/arrow/accessor/sub_columns/ut/ut_sub_columns.cpp +++ b/ydb/core/formats/arrow/accessor/sub_columns/ut/ut_sub_columns.cpp @@ -71,7 +71,7 @@ Y_UNIT_TEST_SUITE(SubColumnsArrayAccessor) { ++idx; } auto bJsonArr = arrBuilder.Finish(jsons.size()); - auto arrData = TSubColumnsArray::Make(bJsonArr, settings).DetachResult(); + auto arrData = TSubColumnsArray::Make(bJsonArr, settings, bJsonArr->GetDataType()).DetachResult(); Cerr << arrData->DebugJson() << Endl; AFL_VERIFY(PrintBinaryJsons(arrData->GetChunkedArray()) == R"([[{"a":"1","b":"1","c":"111"},null,{"a1":"2","b":"2","c":"222"},{"a":"3","b":"3","c":"333"},null,{"a":"5","b1":"5"}]])")( "string", PrintBinaryJsons(arrData->GetChunkedArray())); @@ -163,7 +163,7 @@ Y_UNIT_TEST_SUITE(SubColumnsArrayAccessor) { ++idx; } auto bJsonArr = arrBuilder.Finish(jsons.size()); - auto arrData = TSubColumnsArray::Make(bJsonArr, settings).DetachResult(); + auto arrData = TSubColumnsArray::Make(bJsonArr, settings, bJsonArr->GetDataType()).DetachResult(); Cerr << arrData->DebugJson() << Endl; AFL_VERIFY(PrintBinaryJsons(arrData->GetChunkedArray()) == R"([[{"a":"1","b":"1","c":"111"},null,{"a1":"2","b":"2","c":"222"},{"a":"3","b":"3","c":"333"},null,{"a":"5","b1":"5"}]])")( "string", PrintBinaryJsons(arrData->GetChunkedArray())); diff --git a/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp b/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp index 9768e24c060..9283bae146a 100644 --- a/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp +++ b/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp @@ -3220,7 +3220,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) { auto rows = ExecuteScanQuery(tableClient, R"( SELECT level, SUM(records_count) as sum_records_count FROM ( --!syntax_v1 - SELECT JSON_VALUE(CAST(`Details` AS JsonDocument), '$.level') as level, CAST(JSON_VALUE(CAST(`Details` AS JsonDocument), '$.portions.records_count') AS Uint64) as records_count, Details + SELECT JSON_VALUE(CAST(`Details` AS JsonDocument), '$.level') as level, CAST(JSON_VALUE(CAST(`Details` AS JsonDocument), '$.selectivity.default.records_count') AS Uint64) as records_count, Details FROM `/Root/olapStore/olapTable/.sys/primary_index_optimizer_stats` ) GROUP BY level diff --git a/ydb/core/kqp/ut/olap/optimizer_ut.cpp b/ydb/core/kqp/ut/olap/optimizer_ut.cpp index b9f87f89a7b..b4d7ac2ac45 100644 --- a/ydb/core/kqp/ut/olap/optimizer_ut.cpp +++ b/ydb/core/kqp/ut/olap/optimizer_ut.cpp @@ -16,6 +16,147 @@ namespace NKikimr::NKqp { Y_UNIT_TEST_SUITE(KqpOlapOptimizer) { + Y_UNIT_TEST(SpecialSliceToOneLayer) { + auto settings = TKikimrSettings().SetWithSampleTables(false); + TKikimrRunner kikimr(settings); + + auto csController = NYDBTest::TControllers::RegisterCSControllerGuard<NYDBTest::NColumnShard::TController>(); + csController->SetOverridePeriodicWakeupActivationPeriod(TDuration::Seconds(1)); + csController->SetOverrideLagForCompactionBeforeTierings(TDuration::Seconds(1)); + csController->SetOverrideMemoryLimitForPortionReading(1e+10); + csController->SetOverrideBlobSplitSettings(NOlap::NSplitter::TSplitSettings().SetMaxPortionSize(30000)); + + TLocalHelper(kikimr).CreateTestOlapTable("olapTable", "olapStore", 1, 1); + auto tableClient = kikimr.GetTableClient(); + + { + auto alterQuery = + R"(ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_OPTIONS, `COMPACTION_PLANNER.CLASS_NAME`=`lc-buckets`, `COMPACTION_PLANNER.FEATURES`=` + {"levels" : [{"class_name" : "Zero", "expected_blobs_size" : 20000, "portions_size_limit" : 400000, "portions_count_available" : 2}, + {"class_name" : "Zero", "expected_blobs_size" : 20000, "portions_count_available" : 1, "default_selector_name" : "slice"}, + {"class_name" : "OneLayer", "expected_portion_size" : 40000, "size_limit_guarantee" : 100000000, "bytes_limit_fraction" : 1}], + "selectors" : [{"class_name" : "Transparent", "name" : "default"}, {"class_name" : "Snapshot", "name" : "slice", "interval" : {"finish_seconds_utc" : 0}}]}`); + )"; + auto session = tableClient.CreateSession().GetValueSync().GetSession(); + auto alterResult = session.ExecuteSchemeQuery(alterQuery).GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(alterResult.GetStatus(), NYdb::EStatus::SUCCESS, alterResult.GetIssues().ToString()); + } + + for (ui32 i = 0; i < 100; ++i) { + WriteTestData(kikimr, "/Root/olapStore/olapTable", 0, i * 1000, 10000); + if (i % 10 == 0) { + csController->WaitCompactions(TDuration::MilliSeconds(10)); + } + } + csController->WaitCompactions(TDuration::Seconds(10)); + + { + auto it = tableClient + .StreamExecuteScanQuery(R"( + --!syntax_v1 + SELECT COUNT(*) + FROM `/Root/olapStore/olapTable` + )") + .GetValueSync(); + UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString()); + TString result = StreamResultToYson(it); + Cout << result << Endl; + CompareYson(result, R"([[109000u;]])"); + } + + { + auto it = tableClient + .StreamExecuteScanQuery(R"( + --!syntax_v1 + SELECT CAST(JSON_VALUE(CAST(Details AS JsonDocument), "$.level") AS Uint64) AS LEVEL, CAST(JSON_VALUE(CAST(Details AS JsonDocument), "$.selectivity.default.records_count") AS Uint64) AS RECORDS_COUNT + FROM `/Root/olapStore/olapTable/.sys/primary_index_optimizer_stats` + ORDER BY LEVEL + )") + .GetValueSync(); + UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString()); + + auto rows = CollectRows(it); + AFL_VERIFY(rows.size() == 3); + AFL_VERIFY(0 == GetUint64(rows[0].at("LEVEL"))); + AFL_VERIFY(GetUint64(rows[0].at("RECORDS_COUNT")) == 0); + AFL_VERIFY(1 == GetUint64(rows[1].at("LEVEL"))); + AFL_VERIFY(GetUint64(rows[1].at("RECORDS_COUNT")) >= 440000); + AFL_VERIFY(GetUint64(rows[1].at("RECORDS_COUNT")) <= 550000); + AFL_VERIFY(2 == GetUint64(rows[2].at("LEVEL"))); + AFL_VERIFY(GetUint64(rows[2].at("RECORDS_COUNT")) == 0); + + for (auto&& i : rows) { + Cerr << GetUint64(i.at("LEVEL")) << "/" << GetUint64(i.at("RECORDS_COUNT")) << Endl; + } + } + + { + auto alterQuery = + TStringBuilder() << Sprintf( + R"(ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_OPTIONS, `COMPACTION_PLANNER.CLASS_NAME`=`lc-buckets`, `COMPACTION_PLANNER.FEATURES`=` + {"levels" : [{"class_name" : "Zero", "expected_blobs_size" : 20000, "portions_size_limit" : 100000, "portions_count_available" : 1}, + {"class_name" : "Zero", "expected_blobs_size" : 20000, "portions_size_limit" : 100000, "portions_count_available" : 1, "default_selector_name" : "slice"}, + {"class_name" : "OneLayer", "expected_portion_size" : 40000, "size_limit_guarantee" : 100000000, "bytes_limit_fraction" : 1}], + "selectors" : [{"class_name" : "Snapshot", "name" : "default"}, {"class_name" : "Snapshot", "name" : "slice", "interval" : {"finish_seconds_utc" : %d}}]}`); + )", + Now().Seconds()); + auto session = tableClient.CreateSession().GetValueSync().GetSession(); + auto alterResult = session.ExecuteSchemeQuery(alterQuery).GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(alterResult.GetStatus(), NYdb::EStatus::SUCCESS, alterResult.GetIssues().ToString()); + } + csController->WaitCompactions(TDuration::Seconds(10)); + { + auto it = tableClient + .StreamExecuteScanQuery(R"( + --!syntax_v1 + SELECT COUNT(*) + FROM `/Root/olapStore/olapTable` + )") + .GetValueSync(); + UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString()); + TString result = StreamResultToYson(it); + Cout << result << Endl; + CompareYson(result, R"([[109000u;]])"); + } + + { + auto it = tableClient + .StreamExecuteScanQuery(R"( + --!syntax_v1 + SELECT CAST(JSON_VALUE(CAST(Details AS JsonDocument), "$.level") AS Uint64) AS LEVEL, Details + FROM `/Root/olapStore/olapTable/.sys/primary_index_optimizer_stats` + ORDER BY LEVEL + )") + .GetValueSync(); + UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString()); + TString result = StreamResultToYson(it); + Cerr << result << Endl; + } + + { + auto it = tableClient + .StreamExecuteScanQuery(R"( + --!syntax_v1 + SELECT CAST(JSON_VALUE(CAST(Details AS JsonDocument), "$.level") AS Uint64) AS LEVEL, + CAST(JSON_VALUE(CAST(Details AS JsonDocument), "$.selectivity.default.records_count") AS Uint64) AS RECORDS_COUNT_DEFAULT, + CAST(JSON_VALUE(CAST(Details AS JsonDocument), "$.selectivity.slice.records_count") AS Uint64) AS RECORDS_COUNT_SLICE + FROM `/Root/olapStore/olapTable/.sys/primary_index_optimizer_stats` + ORDER BY LEVEL + )") + .GetValueSync(); + UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString()); + auto rows = CollectRows(it); + for (auto&& i : rows) { + Cerr << GetUint64(i.at("LEVEL")) << "/" << GetUint64(i.at("RECORDS_COUNT_DEFAULT")) << "/" << GetUint64(i.at("RECORDS_COUNT_SLICE")) << Endl; + } + AFL_VERIFY(0 == GetUint64(rows[0].at("LEVEL"))); + AFL_VERIFY(GetUint64(rows[0].at("RECORDS_COUNT_DEFAULT")) == 0); + AFL_VERIFY(1 == GetUint64(rows[1].at("LEVEL"))); + AFL_VERIFY(GetUint64(rows[1].at("RECORDS_COUNT_DEFAULT")) == 0); + AFL_VERIFY(2 == GetUint64(rows[2].at("LEVEL"))); + AFL_VERIFY(GetUint64(rows[2].at("RECORDS_COUNT_SLICE")) == 109000); + } + } Y_UNIT_TEST(MultiLayersOptimization) { auto settings = TKikimrSettings().SetWithSampleTables(false); TKikimrRunner kikimr(settings); @@ -70,7 +211,7 @@ Y_UNIT_TEST_SUITE(KqpOlapOptimizer) { auto it = tableClient .StreamExecuteScanQuery(R"( --!syntax_v1 - SELECT CAST(JSON_VALUE(CAST(Details AS JsonDocument), "$.level") AS Uint64) AS LEVEL, CAST(JSON_VALUE(CAST(Details AS JsonDocument), "$.portions.blob_bytes") AS Uint64) AS BYTES + SELECT CAST(JSON_VALUE(CAST(Details AS JsonDocument), "$.level") AS Uint64) AS LEVEL, CAST(JSON_VALUE(CAST(Details AS JsonDocument), "$.selectivity.default.blob_bytes") AS Uint64) AS BYTES FROM `/Root/olapStore/olapTable/.sys/primary_index_optimizer_stats` ORDER BY LEVEL )") @@ -82,6 +223,7 @@ Y_UNIT_TEST_SUITE(KqpOlapOptimizer) { for (auto&& i : rows) { AFL_VERIFY(levelIdx == GetUint64(i.at("LEVEL"))); AFL_VERIFY(GetUint64(i.at("BYTES")) < maxVal[levelIdx]); + Cerr << GetUint64(i.at("LEVEL")) << "/" << GetUint64(i.at("BYTES")) << Endl; ++levelIdx; } } diff --git a/ydb/core/protos/flat_scheme_op.proto b/ydb/core/protos/flat_scheme_op.proto index 60775127f2f..56db83b2b5a 100644 --- a/ydb/core/protos/flat_scheme_op.proto +++ b/ydb/core/protos/flat_scheme_op.proto @@ -521,6 +521,7 @@ message TStorageTierConfig { message TCompactionLevelConstructorContainer { optional string ClassName = 1; + optional string DefaultSelectorName = 2; message TZeroLevel { optional uint32 PortionsLiveDurationSeconds = 1; @@ -543,6 +544,33 @@ message TCompactionLevelConstructorContainer { } +message TCompactionSelectorConstructorContainer { + optional string ClassName = 1; + optional string Name = 2; + + message TTransparentSelector { + } + + message TEmptySelector { + } + + message TDataSnapshotInterval { + optional uint32 StartSecondsUTC = 1; + optional uint32 FinishSecondsUTC = 2; + } + + message TSnapshotSelector { + optional TDataSnapshotInterval Interval = 1; + } + + oneof Implementation { + TEmptySelector Empty = 10; + TTransparentSelector Transparent = 11; + TSnapshotSelector Snapshot = 12; + } + +} + message TCompactionPlannerConstructorContainer { optional string ClassName = 1; @@ -557,6 +585,7 @@ message TCompactionPlannerConstructorContainer { message TLCOptimizer { repeated TCompactionLevelConstructorContainer Levels = 1; + repeated TCompactionSelectorConstructorContainer Selectors = 2; } oneof Implementation { diff --git a/ydb/core/tx/columnshard/engines/changes/compaction.cpp b/ydb/core/tx/columnshard/engines/changes/compaction.cpp index 61b9e021dba..b86c5ff28ed 100644 --- a/ydb/core/tx/columnshard/engines/changes/compaction.cpp +++ b/ydb/core/tx/columnshard/engines/changes/compaction.cpp @@ -25,7 +25,10 @@ void TCompactColumnEngineChanges::DoCompile(TFinalizationContext& context) { const TPortionMeta::EProduced producedClassResultCompaction = GetResultProducedClass(); for (auto& portionInfo : AppendedPortions) { - portionInfo.GetPortionConstructor().MutablePortionConstructor().MutableMeta().UpdateRecordsMeta(producedClassResultCompaction); + auto& constructor = portionInfo.GetPortionConstructor().MutablePortionConstructor(); + constructor.MutableMeta().UpdateRecordsMeta(producedClassResultCompaction); + constructor.MutableMeta().SetCompactionLevel(GranuleMeta->GetOptimizerPlanner().GetAppropriateLevel( + GetPortionsToMove().GetTargetCompactionLevel().value_or(0), portionInfo.GetPortionConstructor())); } } diff --git a/ydb/core/tx/columnshard/engines/changes/with_appended.cpp b/ydb/core/tx/columnshard/engines/changes/with_appended.cpp index e4b711ca6df..7006793a71c 100644 --- a/ydb/core/tx/columnshard/engines/changes/with_appended.cpp +++ b/ydb/core/tx/columnshard/engines/changes/with_appended.cpp @@ -79,15 +79,14 @@ void TChangesWithAppend::DoWriteIndexOnComplete(NColumnShard::TColumnShard* self void TChangesWithAppend::DoCompile(TFinalizationContext& context) { AFL_VERIFY(PortionsToRemove.GetSize() + PortionsToMove.GetSize() + AppendedPortions.size() || NoAppendIsCorrect); for (auto&& i : AppendedPortions) { - i.GetPortionConstructor().MutablePortionConstructor().SetPortionId(context.NextPortionId()); - i.GetPortionConstructor().MutablePortionConstructor().MutableMeta().SetCompactionLevel(PortionsToMove.GetTargetCompactionLevel().value_or(0)); + auto& constructor = i.GetPortionConstructor().MutablePortionConstructor(); + constructor.SetPortionId(context.NextPortionId()); + constructor.MutableMeta().SetCompactionLevel(GetPortionsToMove().GetTargetCompactionLevel().value_or(0)); } } void TChangesWithAppend::DoOnAfterCompile() { for (auto&& i : AppendedPortions) { - i.GetPortionConstructor().MutablePortionConstructor().MutableMeta().SetCompactionLevel( - PortionsToMove.GetTargetCompactionLevel().value_or(0)); i.FinalizePortionConstructor(); } } diff --git a/ydb/core/tx/columnshard/engines/portions/constructor_accessor.h b/ydb/core/tx/columnshard/engines/portions/constructor_accessor.h index ae04f95199e..3eb721d2b94 100644 --- a/ydb/core/tx/columnshard/engines/portions/constructor_accessor.h +++ b/ydb/core/tx/columnshard/engines/portions/constructor_accessor.h @@ -144,6 +144,18 @@ public: } + ui64 GetTotalBlobsSize() const { + AFL_VERIFY(Records.size()); + ui64 size = 0; + for (auto&& r : Records) { + size += r.GetBlobRange().GetSize(); + } + for (auto&& r : Indexes) { + size += r.GetDataSize(); + } + return size; + } + static TPortionAccessorConstructor BuildForRewriteBlobs(const TPortionInfo& portion) { return TPortionAccessorConstructor(portion.BuildConstructor(true, false)); } diff --git a/ydb/core/tx/columnshard/engines/portions/constructor_meta.h b/ydb/core/tx/columnshard/engines/portions/constructor_meta.h index ab222ed7e96..fe7de12fe30 100644 --- a/ydb/core/tx/columnshard/engines/portions/constructor_meta.h +++ b/ydb/core/tx/columnshard/engines/portions/constructor_meta.h @@ -37,6 +37,12 @@ public: TPortionMetaConstructor() = default; TPortionMetaConstructor(const TPortionMeta& meta, const bool withBlobs); + ui64 GetTotalBlobBytes() const { + AFL_VERIFY(ColumnBlobBytes); + AFL_VERIFY(IndexBlobBytes); + return *ColumnBlobBytes + *IndexBlobBytes; + } + const TBlobRange RestoreBlobRange(const TBlobRangeLink16& linkRange) const { return linkRange.RestoreRange(GetBlobId(linkRange.GetBlobIdxVerified())); } diff --git a/ydb/core/tx/columnshard/engines/storage/granule/granule.cpp b/ydb/core/tx/columnshard/engines/storage/granule/granule.cpp index ac510197dd5..0165bc43857 100644 --- a/ydb/core/tx/columnshard/engines/storage/granule/granule.cpp +++ b/ydb/core/tx/columnshard/engines/storage/granule/granule.cpp @@ -13,14 +13,10 @@ namespace NKikimr::NOlap { - - void TGranuleMeta::AppendPortion(const std::shared_ptr<TPortionInfo>& info) { - AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("event", "upsert_portion")("portion", info->DebugString())( - "path_id", GetPathId()); + AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("event", "upsert_portion")("portion", info->DebugString())("path_id", GetPathId()); AFL_VERIFY(!Portions.contains(info->GetPortionId())); - AFL_VERIFY(info->GetPathId() == GetPathId())("event", "incompatible_granule")("portion", info->DebugString())( - "path_id", GetPathId()); + AFL_VERIFY(info->GetPathId() == GetPathId())("event", "incompatible_granule")("portion", info->DebugString())("path_id", GetPathId()); AFL_VERIFY(info->ValidSnapshotInfo())("event", "incorrect_portion_snapshots")("portion", info->DebugString()); @@ -133,8 +129,8 @@ const NKikimr::NOlap::TGranuleAdditiveSummary& TGranuleMeta::GetAdditiveSummary( return *AdditiveSummaryCache; } -TGranuleMeta::TGranuleMeta( - const TInternalPathId pathId, const TGranulesStorage& owner, const NColumnShard::TGranuleDataCounters& counters, const TVersionedIndex& versionedIndex) +TGranuleMeta::TGranuleMeta(const TInternalPathId pathId, const TGranulesStorage& owner, const NColumnShard::TGranuleDataCounters& counters, + const TVersionedIndex& versionedIndex) : PathId(pathId) , DataAccessorsManager(owner.GetDataAccessorsManager()) , Counters(counters) @@ -235,11 +231,12 @@ std::shared_ptr<NKikimr::ITxReader> TGranuleMeta::BuildLoader( bool TGranuleMeta::TestingLoad(IDbWrapper& db, const TVersionedIndex& versionedIndex) { TInGranuleConstructors constructors; { - if (!db.LoadPortions(PathId, [&](std::unique_ptr<TPortionInfoConstructor>&& portion, const NKikimrTxColumnShard::TIndexPortionMeta& metaProto) { - const TIndexInfo& indexInfo = portion->GetSchema(versionedIndex)->GetIndexInfo(); - AFL_VERIFY(portion->MutableMeta().LoadMetadata(metaProto, indexInfo, db.GetDsGroupSelectorVerified())); - AFL_VERIFY(constructors.AddConstructorVerified(std::move(portion))); - })) { + if (!db.LoadPortions( + PathId, [&](std::unique_ptr<TPortionInfoConstructor>&& portion, const NKikimrTxColumnShard::TIndexPortionMeta& metaProto) { + const TIndexInfo& indexInfo = portion->GetSchema(versionedIndex)->GetIndexInfo(); + AFL_VERIFY(portion->MutableMeta().LoadMetadata(metaProto, indexInfo, db.GetDsGroupSelectorVerified())); + AFL_VERIFY(constructors.AddConstructorVerified(std::move(portion))); + })) { return false; } } diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/abstract/optimizer.h b/ydb/core/tx/columnshard/engines/storage/optimizer/abstract/optimizer.h index 20c68212068..62345cf2484 100644 --- a/ydb/core/tx/columnshard/engines/storage/optimizer/abstract/optimizer.h +++ b/ydb/core/tx/columnshard/engines/storage/optimizer/abstract/optimizer.h @@ -14,6 +14,7 @@ class TColumnEngineChanges; class IStoragesManager; class TGranuleMeta; class TPortionInfo; +class TPortionAccessorConstructor; namespace NDataLocks { class TManager; } @@ -87,7 +88,6 @@ private: virtual bool DoIsOverloaded() const { return false; } - protected: virtual void DoModifyPortions(const THashMap<ui64, std::shared_ptr<TPortionInfo>>& add, const THashMap<ui64, std::shared_ptr<TPortionInfo>>& remove) = 0; @@ -108,6 +108,10 @@ protected: } public: + virtual ui32 GetAppropriateLevel(const ui32 baseLevel, const TPortionAccessorConstructor& /*info*/) const { + return baseLevel; + } + IOptimizerPlanner(const TInternalPathId pathId) : PathId(pathId) { } @@ -195,7 +199,6 @@ private: virtual TConclusion<std::shared_ptr<IOptimizerPlanner>> DoBuildPlanner(const TBuildContext& context) const = 0; virtual void DoSerializeToProto(TProto& proto) const = 0; virtual bool DoDeserializeFromProto(const TProto& proto) = 0; - virtual bool DoIsEqualTo(const IOptimizerPlannerConstructor& item) const = 0; virtual TConclusionStatus DoDeserializeFromJson(const NJson::TJsonValue& jsonInfo) = 0; virtual bool DoApplyToCurrentObject(IOptimizerPlanner& current) const = 0; @@ -230,10 +233,11 @@ public: bool IsEqualTo(const std::shared_ptr<IOptimizerPlannerConstructor>& item) const { AFL_VERIFY(!!item); - if (GetClassName() != item->GetClassName()) { - return false; - } - return DoIsEqualTo(*item); + TProto selfProto; + TProto itemProto; + SerializeToProto(selfProto); + item->SerializeToProto(itemProto); + return selfProto.SerializeAsString() == itemProto.SerializeAsString(); } bool DeserializeFromProto(const TProto& proto) { diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/lbuckets/constructor/constructor.cpp b/ydb/core/tx/columnshard/engines/storage/optimizer/lbuckets/constructor/constructor.cpp index 753e30db760..c66583b7647 100644 --- a/ydb/core/tx/columnshard/engines/storage/optimizer/lbuckets/constructor/constructor.cpp +++ b/ydb/core/tx/columnshard/engines/storage/optimizer/lbuckets/constructor/constructor.cpp @@ -15,12 +15,6 @@ bool TOptimizerPlannerConstructor::DoApplyToCurrentObject(IOptimizerPlanner& cur return true; } -bool TOptimizerPlannerConstructor::DoIsEqualTo(const IOptimizerPlannerConstructor& item) const { - const auto* itemClass = dynamic_cast<const TOptimizerPlannerConstructor*>(&item); - AFL_VERIFY(itemClass); - return true; -} - void TOptimizerPlannerConstructor::DoSerializeToProto(TProto& proto) const { *proto.MutableLBuckets() = NKikimrSchemeOp::TCompactionPlannerConstructorContainer::TLOptimizer(); } diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/lbuckets/constructor/constructor.h b/ydb/core/tx/columnshard/engines/storage/optimizer/lbuckets/constructor/constructor.h index 299fb90ae72..4f9d86b0ab0 100644 --- a/ydb/core/tx/columnshard/engines/storage/optimizer/lbuckets/constructor/constructor.h +++ b/ydb/core/tx/columnshard/engines/storage/optimizer/lbuckets/constructor/constructor.h @@ -20,7 +20,6 @@ private: virtual bool DoApplyToCurrentObject(IOptimizerPlanner& current) const override; virtual TConclusion<std::shared_ptr<IOptimizerPlanner>> DoBuildPlanner(const TBuildContext& context) const override; - virtual bool DoIsEqualTo(const IOptimizerPlannerConstructor& item) const override; public: virtual TString GetClassName() const override { return GetClassNameStatic(); diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/constructor.cpp b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/constructor.cpp index 29abc5f4acb..dcf0927a9bf 100644 --- a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/constructor.cpp +++ b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/constructor.cpp @@ -1,32 +1,15 @@ #include "constructor.h" + #include <ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/optimizer.h> namespace NKikimr::NOlap::NStorageOptimizer::NLCBuckets { -NKikimr::TConclusion<std::shared_ptr<NKikimr::NOlap::NStorageOptimizer::IOptimizerPlanner>> TOptimizerPlannerConstructor::DoBuildPlanner(const TBuildContext& context) const { - return std::make_shared<TOptimizerPlanner>(context.GetPathId(), context.GetStorages(), context.GetPKSchema(), Levels); +TConclusion<std::shared_ptr<IOptimizerPlanner>> TOptimizerPlannerConstructor::DoBuildPlanner(const TBuildContext& context) const { + return std::make_shared<TOptimizerPlanner>(context.GetPathId(), context.GetStorages(), context.GetPKSchema(), Levels, Selectors); } -bool TOptimizerPlannerConstructor::DoApplyToCurrentObject(IOptimizerPlanner& current) const { - auto* itemClass = dynamic_cast<TOptimizerPlanner*>(¤t); - if (!itemClass) { - return false; - } - return true; -} - -bool TOptimizerPlannerConstructor::DoIsEqualTo(const IOptimizerPlannerConstructor& item) const { - const auto* itemClass = dynamic_cast<const TOptimizerPlannerConstructor*>(&item); - AFL_VERIFY(itemClass); - if (Levels.size() != itemClass->Levels.size()) { - return false; - } - for (ui32 i = 0; i < Levels.size(); ++i) { - if (!Levels[i]->IsEqualTo(*itemClass->Levels[i].GetObjectPtrVerified())) { - return false; - } - } - return true; +bool TOptimizerPlannerConstructor::DoApplyToCurrentObject(IOptimizerPlanner& /*current*/) const { + return false; } void TOptimizerPlannerConstructor::DoSerializeToProto(TProto& proto) const { @@ -34,6 +17,9 @@ void TOptimizerPlannerConstructor::DoSerializeToProto(TProto& proto) const { for (auto&& i : Levels) { *proto.MutableLCBuckets()->AddLevels() = i.SerializeToProto(); } + for (auto&& i : Selectors) { + *proto.MutableLCBuckets()->AddSelectors() = i.SerializeToProto(); + } } bool TOptimizerPlannerConstructor::DoDeserializeFromProto(const TProto& proto) { @@ -49,10 +35,42 @@ bool TOptimizerPlannerConstructor::DoDeserializeFromProto(const TProto& proto) { } Levels.emplace_back(std::move(lContainer)); } + for (auto&& i : proto.GetLCBuckets().GetSelectors()) { + TSelectorConstructorContainer container; + if (!container.DeserializeFromProto(i)) { + AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("error", "cannot parse lc-bucket selector")("proto", i.DebugString()); + return false; + } + Selectors.emplace_back(std::move(container)); + } return true; } NKikimr::TConclusionStatus TOptimizerPlannerConstructor::DoDeserializeFromJson(const NJson::TJsonValue& jsonInfo) { + std::set<TString> selectorNames; + if (jsonInfo.Has("selectors")) { + if (!jsonInfo["selectors"].IsArray()) { + return TConclusionStatus::Fail("selectors have to been array in json description"); + } + auto& arr = jsonInfo["selectors"].GetArray(); + if (!arr.size()) { + return TConclusionStatus::Fail("no objects in json array 'selectors'"); + } + for (auto&& i : arr) { + const auto className = i["class_name"].GetStringRobust(); + auto selector = ISelectorConstructor::TFactory::MakeHolder(className); + if (!selector) { + return TConclusionStatus::Fail("incorrect portions selector class_name: " + className); + } + if (!selector->DeserializeFromJson(i)) { + return TConclusionStatus::Fail("cannot parse portions selector: " + i.GetStringRobust()); + } + Selectors.emplace_back(TSelectorConstructorContainer(std::shared_ptr<ISelectorConstructor>(selector.Release()))); + if (!selectorNames.emplace(Selectors.back()->GetName()).second) { + return TConclusionStatus::Fail("selector name duplication: '" + Selectors.back()->GetName() + "'"); + } + } + } if (!jsonInfo.Has("levels")) { return TConclusionStatus::Fail("no levels description"); } @@ -74,6 +92,15 @@ NKikimr::TConclusionStatus TOptimizerPlannerConstructor::DoDeserializeFromJson(c return TConclusionStatus::Fail("cannot parse level: " + i.GetStringRobust() + "; " + parseConclusion.GetErrorMessage()); } Levels.emplace_back(TLevelConstructorContainer(std::shared_ptr<ILevelConstructor>(level.Release()))); + if (selectorNames.empty()) { + if (Levels.back()->GetDefaultSelectorName() != "default") { + return TConclusionStatus::Fail("incorrect default selector name for level: '" + Levels.back()->GetDefaultSelectorName() + "'"); + } + } else { + if (!selectorNames.contains(Levels.back()->GetDefaultSelectorName())) { + return TConclusionStatus::Fail("unknown default selector name for level: '" + Levels.back()->GetDefaultSelectorName() + "'"); + } + } } return TConclusionStatus::Success(); } diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/constructor.h b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/constructor.h index ece83fc6f89..6872fe817b9 100644 --- a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/constructor.h +++ b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/constructor.h @@ -1,58 +1,11 @@ #pragma once +#include "level/constructor.h" +#include "selector/constructor.h" + #include <ydb/core/tx/columnshard/engines/storage/optimizer/abstract/optimizer.h> -#include <ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/counters.h> -#include <ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/abstract.h> namespace NKikimr::NOlap::NStorageOptimizer::NLCBuckets { -class ILevelConstructor { -private: - virtual std::shared_ptr<IPortionsLevel> DoBuildLevel(const std::shared_ptr<IPortionsLevel>& nextLevel, const ui32 indexLevel, - const std::shared_ptr<TSimplePortionsGroupInfo>& portionsInfo, const TLevelCounters& counters) const = 0; - virtual TConclusionStatus DoDeserializeFromJson(const NJson::TJsonValue& json) = 0; - virtual bool DoDeserializeFromProto(const NKikimrSchemeOp::TCompactionLevelConstructorContainer& proto) = 0; - virtual void DoSerializeToProto(NKikimrSchemeOp::TCompactionLevelConstructorContainer& proto) const = 0; - virtual bool IsEqualToSameClass(const ILevelConstructor& item) const = 0; - -public: - using TFactory = NObjectFactory::TObjectFactory<ILevelConstructor, TString>; - using TProto = NKikimrSchemeOp::TCompactionLevelConstructorContainer; - - virtual ~ILevelConstructor() = default; - - bool IsEqualTo(const ILevelConstructor& item) const { - if (GetClassName() != item.GetClassName()) { - return false; - } - return IsEqualToSameClass(item); - } - - std::shared_ptr<IPortionsLevel> BuildLevel(const std::shared_ptr<IPortionsLevel>& nextLevel, const ui32 indexLevel, - const std::shared_ptr<TSimplePortionsGroupInfo>& portionsInfo, const TLevelCounters& counters) const { - return DoBuildLevel(nextLevel, indexLevel, portionsInfo, counters); - } - - TConclusionStatus DeserializeFromJson(const NJson::TJsonValue& json) { - return DoDeserializeFromJson(json); - } - - bool DeserializeFromProto(const TProto& proto) { - return DoDeserializeFromProto(proto); - } - void SerializeToProto(NKikimrSchemeOp::TCompactionLevelConstructorContainer& proto) const { - return DoSerializeToProto(proto); - } - virtual TString GetClassName() const = 0; -}; - -class TLevelConstructorContainer: public NBackgroundTasks::TInterfaceProtoContainer<ILevelConstructor> { -private: - using TBase = NBackgroundTasks::TInterfaceProtoContainer<ILevelConstructor>; - -public: - using TBase::TBase; -}; - class TOptimizerPlannerConstructor: public IOptimizerPlannerConstructor { public: static TString GetClassNameStatic() { @@ -61,6 +14,7 @@ public: private: std::vector<TLevelConstructorContainer> Levels; + std::vector<TSelectorConstructorContainer> Selectors; static inline const TFactory::TRegistrator<TOptimizerPlannerConstructor> Registrator = TFactory::TRegistrator<TOptimizerPlannerConstructor>(GetClassNameStatic()); @@ -72,7 +26,6 @@ private: virtual bool DoApplyToCurrentObject(IOptimizerPlanner& current) const override; virtual TConclusion<std::shared_ptr<IOptimizerPlanner>> DoBuildPlanner(const TBuildContext& context) const override; - virtual bool DoIsEqualTo(const IOptimizerPlannerConstructor& item) const override; public: virtual TString GetClassName() const override { diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/level/constructor.cpp b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/level/constructor.cpp new file mode 100644 index 00000000000..1f9801b0d23 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/level/constructor.cpp @@ -0,0 +1,4 @@ +#include "constructor.h" + +namespace NKikimr::NOlap::NStorageOptimizer::NLCBuckets { +} // namespace NKikimr::NOlap::NStorageOptimizer::NLCBuckets diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/level/constructor.h b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/level/constructor.h new file mode 100644 index 00000000000..0d76ced744a --- /dev/null +++ b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/level/constructor.h @@ -0,0 +1,81 @@ +#pragma once +#include <ydb/core/protos/flat_scheme_op.pb.h> +#include <ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/level/abstract.h> +#include <ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/level/counters.h> + +#include <ydb/services/bg_tasks/abstract/interface.h> + +#include <library/cpp/json/writer/json_value.h> + +namespace NKikimr::NOlap::NStorageOptimizer::NLCBuckets { + +class ILevelConstructor { +private: + YDB_READONLY(TString, DefaultSelectorName, "default"); + virtual std::shared_ptr<IPortionsLevel> DoBuildLevel(const std::shared_ptr<IPortionsLevel>& nextLevel, const ui32 indexLevel, + const std::shared_ptr<TSimplePortionsGroupInfo>& portionsInfo, const TLevelCounters& counters, + const std::vector<std::shared_ptr<IPortionsSelector>>& selectors) const = 0; + virtual TConclusionStatus DoDeserializeFromJson(const NJson::TJsonValue& json) = 0; + virtual bool DoDeserializeFromProto(const NKikimrSchemeOp::TCompactionLevelConstructorContainer& proto) = 0; + virtual void DoSerializeToProto(NKikimrSchemeOp::TCompactionLevelConstructorContainer& proto) const = 0; + +public: + using TFactory = NObjectFactory::TObjectFactory<ILevelConstructor, TString>; + using TProto = NKikimrSchemeOp::TCompactionLevelConstructorContainer; + + virtual ~ILevelConstructor() = default; + + bool IsEqualTo(const ILevelConstructor& item) const { + return SerializeToProto().SerializeAsString() == item.SerializeToProto().SerializeAsString(); + } + + std::shared_ptr<IPortionsLevel> BuildLevel(const std::shared_ptr<IPortionsLevel>& nextLevel, const ui32 indexLevel, + const std::shared_ptr<TSimplePortionsGroupInfo>& portionsInfo, const TLevelCounters& counters, + const std::vector<std::shared_ptr<IPortionsSelector>>& selectors) const { + return DoBuildLevel(nextLevel, indexLevel, portionsInfo, counters, selectors); + } + + TConclusionStatus DeserializeFromJson(const NJson::TJsonValue& json) { + if (json.Has("default_selector_name")) { + if (!json["default_selector_name"].IsString()) { + return TConclusionStatus::Fail("default_selector_name have to be string"); + } + if (!json["default_selector_name"].GetString()) { + return TConclusionStatus::Fail("default_selector_name have to be not empty string"); + } + DefaultSelectorName = json["default_selector_name"].GetString(); + } + return DoDeserializeFromJson(json); + } + + bool DeserializeFromProto(const TProto& proto) { + if (proto.HasDefaultSelectorName()) { + DefaultSelectorName = proto.GetDefaultSelectorName(); + } else { + DefaultSelectorName = "default"; + } + return DoDeserializeFromProto(proto); + } + void SerializeToProto(NKikimrSchemeOp::TCompactionLevelConstructorContainer& proto) const { + if (DefaultSelectorName != "default") { + proto.SetDefaultSelectorName(DefaultSelectorName); + } + return DoSerializeToProto(proto); + } + NKikimrSchemeOp::TCompactionLevelConstructorContainer SerializeToProto() const { + NKikimrSchemeOp::TCompactionLevelConstructorContainer result; + SerializeToProto(result); + return result; + } + virtual TString GetClassName() const = 0; +}; + +class TLevelConstructorContainer: public NBackgroundTasks::TInterfaceProtoContainer<ILevelConstructor> { +private: + using TBase = NBackgroundTasks::TInterfaceProtoContainer<ILevelConstructor>; + +public: + using TBase::TBase; +}; + +} // namespace NKikimr::NOlap::NStorageOptimizer::NLCBuckets diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/one_layer.cpp b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/level/one_layer.cpp index bb252c4c5a4..3d80b4de5df 100644 --- a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/one_layer.cpp +++ b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/level/one_layer.cpp @@ -1,6 +1,6 @@ #include "one_layer.h" -#include <ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/common_level.h> +#include <ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/level/common_level.h> namespace NKikimr::NOlap::NStorageOptimizer::NLCBuckets { @@ -65,9 +65,9 @@ void TOneLayerConstructor::DoSerializeToProto(NKikimrSchemeOp::TCompactionLevelC std::shared_ptr<NStorageOptimizer::NLCBuckets::IPortionsLevel> TOneLayerConstructor::DoBuildLevel( const std::shared_ptr<IPortionsLevel>& nextLevel, const ui32 indexLevel, const std::shared_ptr<TSimplePortionsGroupInfo>& portionsInfo, - const TLevelCounters& counters) const { + const TLevelCounters& counters, const std::vector<std::shared_ptr<IPortionsSelector>>& selectors) const { return std::make_shared<TOneLayerPortions>(indexLevel, BytesLimitFraction.value_or(1), ExpectedPortionSize.value_or(2 << 20), nextLevel, - portionsInfo, counters, SizeLimitGuarantee); + portionsInfo, counters, SizeLimitGuarantee, selectors, GetDefaultSelectorName()); } } // namespace NKikimr::NOlap::NStorageOptimizer::NLCBuckets diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/one_layer.h b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/level/one_layer.h index e128962c3f2..37367cb3243 100644 --- a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/one_layer.h +++ b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/level/one_layer.h @@ -15,14 +15,11 @@ private: ui64 SizeLimitGuarantee = 0; virtual std::shared_ptr<IPortionsLevel> DoBuildLevel(const std::shared_ptr<IPortionsLevel>& nextLevel, const ui32 indexLevel, - const std::shared_ptr<TSimplePortionsGroupInfo>& portionsInfo, const TLevelCounters& counters) const override; + const std::shared_ptr<TSimplePortionsGroupInfo>& portionsInfo, const TLevelCounters& counters, + const std::vector<std::shared_ptr<IPortionsSelector>>& selectors) const override; virtual TConclusionStatus DoDeserializeFromJson(const NJson::TJsonValue& json) override; virtual bool DoDeserializeFromProto(const NKikimrSchemeOp::TCompactionLevelConstructorContainer& proto) override; virtual void DoSerializeToProto(NKikimrSchemeOp::TCompactionLevelConstructorContainer& proto) const override; - virtual bool IsEqualToSameClass(const ILevelConstructor& item) const override { - const auto& itemCast = dynamic_cast<const TOneLayerConstructor&>(item); - return BytesLimitFraction == itemCast.BytesLimitFraction && ExpectedPortionSize == itemCast.ExpectedPortionSize; - } static const inline TFactory::TRegistrator<TOneLayerConstructor> Registrator = TFactory::TRegistrator<TOneLayerConstructor>(GetClassNameStatic()); diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/level/ya.make b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/level/ya.make new file mode 100644 index 00000000000..84d55e6557e --- /dev/null +++ b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/level/ya.make @@ -0,0 +1,16 @@ +LIBRARY() + +SRCS( + constructor.cpp + GLOBAL zero_level.cpp + GLOBAL one_layer.cpp +) + +PEERDIR( + contrib/libs/apache/arrow + ydb/core/protos + ydb/core/formats/arrow + ydb/core/tx/columnshard/engines/changes/abstract +) + +END() diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/zero_level.cpp b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/level/zero_level.cpp index 2ad81f0dbe4..a1c09c28b5f 100644 --- a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/zero_level.cpp +++ b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/level/zero_level.cpp @@ -1,6 +1,6 @@ #include "zero_level.h" -#include <ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/zero_level.h> +#include <ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/level/zero_level.h> namespace NKikimr::NOlap::NStorageOptimizer::NLCBuckets { @@ -54,47 +54,50 @@ bool TZeroLevelConstructor::DoDeserializeFromProto(const NKikimrSchemeOp::TCompa if (!proto.HasZeroLevel()) { return true; } - if (proto.GetZeroLevel().HasPortionsLiveDurationSeconds()) { - PortionsLiveDuration = TDuration::Seconds(proto.GetZeroLevel().GetPortionsLiveDurationSeconds()); + auto& pLevel = proto.GetZeroLevel(); + if (pLevel.HasPortionsLiveDurationSeconds()) { + PortionsLiveDuration = TDuration::Seconds(pLevel.GetPortionsLiveDurationSeconds()); } - if (proto.GetZeroLevel().HasExpectedBlobsSize()) { - ExpectedBlobsSize = proto.GetZeroLevel().GetExpectedBlobsSize(); + if (pLevel.HasExpectedBlobsSize()) { + ExpectedBlobsSize = pLevel.GetExpectedBlobsSize(); } - if (proto.GetZeroLevel().HasPortionsCountAvailable()) { - PortionsCountAvailable = proto.GetZeroLevel().GetPortionsCountAvailable(); + if (pLevel.HasPortionsCountAvailable()) { + PortionsCountAvailable = pLevel.GetPortionsCountAvailable(); } - if (proto.GetZeroLevel().HasPortionsCountLimit()) { - PortionsCountLimit = proto.GetZeroLevel().GetPortionsCountLimit(); + if (pLevel.HasPortionsCountLimit()) { + PortionsCountLimit = pLevel.GetPortionsCountLimit(); } - if (proto.GetZeroLevel().HasPortionsSizeLimit()) { - PortionsSizeLimit = proto.GetZeroLevel().GetPortionsSizeLimit(); + if (pLevel.HasPortionsSizeLimit()) { + PortionsSizeLimit = pLevel.GetPortionsSizeLimit(); } return true; } void TZeroLevelConstructor::DoSerializeToProto(NKikimrSchemeOp::TCompactionLevelConstructorContainer& proto) const { + auto& mLevel = *proto.MutableZeroLevel(); if (PortionsLiveDuration) { - proto.MutableZeroLevel()->SetPortionsLiveDurationSeconds(PortionsLiveDuration->Seconds()); + mLevel.SetPortionsLiveDurationSeconds(PortionsLiveDuration->Seconds()); } if (ExpectedBlobsSize) { - proto.MutableZeroLevel()->SetExpectedBlobsSize(*ExpectedBlobsSize); + mLevel.SetExpectedBlobsSize(*ExpectedBlobsSize); } if (PortionsCountAvailable) { - proto.MutableZeroLevel()->SetPortionsCountAvailable(*PortionsCountAvailable); + mLevel.SetPortionsCountAvailable(*PortionsCountAvailable); } if (PortionsCountLimit) { - proto.MutableZeroLevel()->SetPortionsCountLimit(*PortionsCountLimit); + mLevel.SetPortionsCountLimit(*PortionsCountLimit); } if (PortionsSizeLimit) { - proto.MutableZeroLevel()->SetPortionsSizeLimit(*PortionsSizeLimit); + mLevel.SetPortionsSizeLimit(*PortionsSizeLimit); } } std::shared_ptr<NKikimr::NOlap::NStorageOptimizer::NLCBuckets::IPortionsLevel> TZeroLevelConstructor::DoBuildLevel( const std::shared_ptr<IPortionsLevel>& nextLevel, const ui32 indexLevel, const std::shared_ptr<TSimplePortionsGroupInfo>& /*portionsInfo*/, - const TLevelCounters& counters) const { - return std::make_shared<TZeroLevelPortions>(indexLevel, nextLevel, counters, PortionsLiveDuration.value_or(TDuration::Max()), - ExpectedBlobsSize.value_or((ui64)1 << 20), PortionsCountAvailable.value_or(10), PortionsCountLimit, PortionsSizeLimit); + const TLevelCounters& counters, const std::vector<std::shared_ptr<IPortionsSelector>>& selectors) const { + return std::make_shared<TZeroLevelPortions>(indexLevel, nextLevel, counters, + std::make_shared<TLimitsOverloadChecker>(PortionsCountLimit, PortionsSizeLimit), PortionsLiveDuration.value_or(TDuration::Max()), + ExpectedBlobsSize.value_or((ui64)1 << 20), PortionsCountAvailable.value_or(10), selectors, GetDefaultSelectorName()); } } // namespace NKikimr::NOlap::NStorageOptimizer::NLCBuckets diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/zero_level.h b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/level/zero_level.h index 45ed3c41a11..4fb8cda439b 100644 --- a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/zero_level.h +++ b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/level/zero_level.h @@ -17,16 +17,11 @@ private: std::optional<ui64> PortionsSizeLimit; virtual std::shared_ptr<IPortionsLevel> DoBuildLevel(const std::shared_ptr<IPortionsLevel>& nextLevel, const ui32 indexLevel, - const std::shared_ptr<TSimplePortionsGroupInfo>& portionsInfo, const TLevelCounters& counters) const override; + const std::shared_ptr<TSimplePortionsGroupInfo>& portionsInfo, const TLevelCounters& counters, + const std::vector<std::shared_ptr<IPortionsSelector>>& selectors) const override; virtual TConclusionStatus DoDeserializeFromJson(const NJson::TJsonValue& json) override; virtual bool DoDeserializeFromProto(const NKikimrSchemeOp::TCompactionLevelConstructorContainer& proto) override; virtual void DoSerializeToProto(NKikimrSchemeOp::TCompactionLevelConstructorContainer& proto) const override; - virtual bool IsEqualToSameClass(const ILevelConstructor& item) const override { - const auto& itemCast = dynamic_cast<const TZeroLevelConstructor&>(item); - return PortionsLiveDuration == itemCast.PortionsLiveDuration && ExpectedBlobsSize == itemCast.ExpectedBlobsSize && - PortionsCountAvailable == itemCast.PortionsCountAvailable && PortionsCountLimit == itemCast.PortionsCountLimit && - PortionsSizeLimit == itemCast.PortionsSizeLimit; - } static const inline TFactory::TRegistrator<TZeroLevelConstructor> Registrator = TFactory::TRegistrator<TZeroLevelConstructor>(GetClassNameStatic()); diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/selector/constructor.cpp b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/selector/constructor.cpp new file mode 100644 index 00000000000..2796ac2563e --- /dev/null +++ b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/selector/constructor.cpp @@ -0,0 +1,5 @@ +#include "constructor.h" + +namespace NKikimr::NOlap::NStorageOptimizer::NLCBuckets { + +} // namespace NKikimr::NOlap::NStorageOptimizer::NLBuckets diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/selector/constructor.h b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/selector/constructor.h new file mode 100644 index 00000000000..21c66a399c5 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/selector/constructor.h @@ -0,0 +1,74 @@ +#pragma once +#include <ydb/core/protos/flat_scheme_op.pb.h> +#include <ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/selector/abstract.h> + +#include <ydb/services/bg_tasks/abstract/interface.h> + +#include <library/cpp/json/writer/json_value.h> + +namespace NKikimr::NOlap::NStorageOptimizer::NLCBuckets { + +class ISelectorConstructor { +private: + YDB_READONLY_DEF(TString, Name); + virtual std::shared_ptr<IPortionsSelector> DoBuildSelector() const = 0; + virtual TConclusionStatus DoDeserializeFromJson(const NJson::TJsonValue& json) = 0; + virtual bool DoDeserializeFromProto(const NKikimrSchemeOp::TCompactionSelectorConstructorContainer& proto) = 0; + virtual void DoSerializeToProto(NKikimrSchemeOp::TCompactionSelectorConstructorContainer& proto) const = 0; + +public: + using TFactory = NObjectFactory::TObjectFactory<ISelectorConstructor, TString>; + using TProto = NKikimrSchemeOp::TCompactionSelectorConstructorContainer; + + virtual ~ISelectorConstructor() = default; + + bool IsEqualTo(const ISelectorConstructor& item) const { + return SerializeToProto().SerializeAsString() == item.SerializeToProto().SerializeAsString(); + } + + std::shared_ptr<IPortionsSelector> BuildSelector() const { + return DoBuildSelector(); + } + + TConclusionStatus DeserializeFromJson(const NJson::TJsonValue& json) { + if (json.Has("name")) { + if (json["name"].IsString()) { + Name = json["name"].GetStringSafe(); + } else { + return TConclusionStatus::Fail("name field have to be string"); + } + } else { + Name = "default"; + } + return DoDeserializeFromJson(json); + } + + bool DeserializeFromProto(const TProto& proto) { + Name = proto.GetName(); + if (!Name) { + AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("event", "cannot parse proto selector constructor")("reason", "empty name"); + return false; + } + return DoDeserializeFromProto(proto); + } + void SerializeToProto(NKikimrSchemeOp::TCompactionSelectorConstructorContainer& proto) const { + proto.SetName(Name); + return DoSerializeToProto(proto); + } + NKikimrSchemeOp::TCompactionSelectorConstructorContainer SerializeToProto() const { + NKikimrSchemeOp::TCompactionSelectorConstructorContainer result; + SerializeToProto(result); + return result; + } + virtual TString GetClassName() const = 0; +}; + +class TSelectorConstructorContainer: public NBackgroundTasks::TInterfaceProtoContainer<ISelectorConstructor> { +private: + using TBase = NBackgroundTasks::TInterfaceProtoContainer<ISelectorConstructor>; + +public: + using TBase::TBase; +}; + +} // namespace NKikimr::NOlap::NStorageOptimizer::NLCBuckets diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/selector/empty.cpp b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/selector/empty.cpp new file mode 100644 index 00000000000..2e12585987b --- /dev/null +++ b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/selector/empty.cpp @@ -0,0 +1,26 @@ +#include "empty.h" + +#include <ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/selector/empty.h> + +namespace NKikimr::NOlap::NStorageOptimizer::NLCBuckets { + +std::shared_ptr<IPortionsSelector> TEmptySelectorConstructor::DoBuildSelector() const { + return std::make_shared<TEmptyPortionsSelector>(GetName()); +} + +TConclusionStatus TEmptySelectorConstructor::DoDeserializeFromJson(const NJson::TJsonValue& /*json*/) { + return TConclusionStatus::Success(); +} + +bool TEmptySelectorConstructor::DoDeserializeFromProto(const NKikimrSchemeOp::TCompactionSelectorConstructorContainer& proto) { + if (!proto.HasEmpty()) { + return false; + } + return true; +} + +void TEmptySelectorConstructor::DoSerializeToProto(NKikimrSchemeOp::TCompactionSelectorConstructorContainer& proto) const { + *proto.MutableEmpty() = NKikimrSchemeOp::TCompactionSelectorConstructorContainer::TEmptySelector(); +} + +} // namespace NKikimr::NOlap::NStorageOptimizer::NLCBuckets diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/selector/empty.h b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/selector/empty.h new file mode 100644 index 00000000000..50bb43b9983 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/selector/empty.h @@ -0,0 +1,26 @@ +#pragma once +#include "constructor.h" + +namespace NKikimr::NOlap::NStorageOptimizer::NLCBuckets { + +class TEmptySelectorConstructor: public ISelectorConstructor { +public: + static TString GetClassNameStatic() { + return "Empty"; + } + +private: + virtual std::shared_ptr<IPortionsSelector> DoBuildSelector() const override; + virtual TConclusionStatus DoDeserializeFromJson(const NJson::TJsonValue& json) override; + virtual bool DoDeserializeFromProto(const NKikimrSchemeOp::TCompactionSelectorConstructorContainer& proto) override; + virtual void DoSerializeToProto(NKikimrSchemeOp::TCompactionSelectorConstructorContainer& proto) const override; + + const static inline auto Registrator = TFactory::TRegistrator<TEmptySelectorConstructor>(GetClassNameStatic()); + +public: + virtual TString GetClassName() const override { + return GetClassNameStatic(); + } +}; + +} // namespace NKikimr::NOlap::NStorageOptimizer::NLCBuckets diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/selector/snapshot.cpp b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/selector/snapshot.cpp new file mode 100644 index 00000000000..6428d5987a9 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/selector/snapshot.cpp @@ -0,0 +1,37 @@ +#include "snapshot.h" + +#include <ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/selector/snapshot.h> + +namespace NKikimr::NOlap::NStorageOptimizer::NLCBuckets { + +std::shared_ptr<IPortionsSelector> TSnapshotSelectorConstructor::DoBuildSelector() const { + return std::make_shared<TSnapshotPortionsSelector>(Interval, GetName()); +} + +TConclusionStatus TSnapshotSelectorConstructor::DoDeserializeFromJson(const NJson::TJsonValue& jsonValue) { + if (jsonValue.Has("interval")) { + auto conclusion = Interval.DeserializeFromJson(jsonValue["interval"]); + if (conclusion.IsFail()) { + return conclusion; + } + } + return TConclusionStatus::Success(); +} + +bool TSnapshotSelectorConstructor::DoDeserializeFromProto(const NKikimrSchemeOp::TCompactionSelectorConstructorContainer& proto) { + if (!proto.HasSnapshot()) { + return false; + } + auto conclusion = Interval.DeserializeFromProto(proto.GetSnapshot().GetInterval()); + if (conclusion.IsFail()) { + AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("event", "cannot parse snapshot selector interval")("reason", conclusion.GetErrorMessage()); + return false; + } + return true; +} + +void TSnapshotSelectorConstructor::DoSerializeToProto(NKikimrSchemeOp::TCompactionSelectorConstructorContainer& proto) const { + *proto.MutableSnapshot()->MutableInterval() = Interval.SerializeToProto(); +} + +} // namespace NKikimr::NOlap::NStorageOptimizer::NLCBuckets diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/selector/snapshot.h b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/selector/snapshot.h new file mode 100644 index 00000000000..a5a9b7e4c1f --- /dev/null +++ b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/selector/snapshot.h @@ -0,0 +1,30 @@ +#pragma once +#include "constructor.h" + +#include <ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/selector/snapshot.h> + +namespace NKikimr::NOlap::NStorageOptimizer::NLCBuckets { + +class TSnapshotSelectorConstructor: public ISelectorConstructor { +public: + static TString GetClassNameStatic() { + return "Snapshot"; + } + +private: + TDataSnapshotInterval Interval; + + virtual std::shared_ptr<IPortionsSelector> DoBuildSelector() const override; + virtual TConclusionStatus DoDeserializeFromJson(const NJson::TJsonValue& json) override; + virtual bool DoDeserializeFromProto(const NKikimrSchemeOp::TCompactionSelectorConstructorContainer& proto) override; + virtual void DoSerializeToProto(NKikimrSchemeOp::TCompactionSelectorConstructorContainer& proto) const override; + + const static inline auto Registrator = TFactory::TRegistrator<TSnapshotSelectorConstructor>(GetClassNameStatic()); + +public: + virtual TString GetClassName() const override { + return GetClassNameStatic(); + } +}; + +} // namespace NKikimr::NOlap::NStorageOptimizer::NLCBuckets diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/selector/transparent.cpp b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/selector/transparent.cpp new file mode 100644 index 00000000000..1555456f57a --- /dev/null +++ b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/selector/transparent.cpp @@ -0,0 +1,26 @@ +#include "transparent.h" + +#include <ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/selector/transparent.h> + +namespace NKikimr::NOlap::NStorageOptimizer::NLCBuckets { + +std::shared_ptr<IPortionsSelector> TTransparentSelectorConstructor::DoBuildSelector() const { + return std::make_shared<TTransparentPortionsSelector>(GetName()); +} + +TConclusionStatus TTransparentSelectorConstructor::DoDeserializeFromJson(const NJson::TJsonValue& /*json*/) { + return TConclusionStatus::Success(); +} + +bool TTransparentSelectorConstructor::DoDeserializeFromProto(const NKikimrSchemeOp::TCompactionSelectorConstructorContainer& proto) { + if (!proto.HasTransparent()) { + return false; + } + return true; +} + +void TTransparentSelectorConstructor::DoSerializeToProto(NKikimrSchemeOp::TCompactionSelectorConstructorContainer& proto) const { + *proto.MutableTransparent() = NKikimrSchemeOp::TCompactionSelectorConstructorContainer::TTransparentSelector(); +} + +} // namespace NKikimr::NOlap::NStorageOptimizer::NLCBuckets diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/selector/transparent.h b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/selector/transparent.h new file mode 100644 index 00000000000..53baadaef13 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/selector/transparent.h @@ -0,0 +1,26 @@ +#pragma once +#include "constructor.h" + +namespace NKikimr::NOlap::NStorageOptimizer::NLCBuckets { + +class TTransparentSelectorConstructor: public ISelectorConstructor { +public: + static TString GetClassNameStatic() { + return "Transparent"; + } + +private: + virtual std::shared_ptr<IPortionsSelector> DoBuildSelector() const override; + virtual TConclusionStatus DoDeserializeFromJson(const NJson::TJsonValue& json) override; + virtual bool DoDeserializeFromProto(const NKikimrSchemeOp::TCompactionSelectorConstructorContainer& proto) override; + virtual void DoSerializeToProto(NKikimrSchemeOp::TCompactionSelectorConstructorContainer& proto) const override; + + const static inline auto Registrator = TFactory::TRegistrator<TTransparentSelectorConstructor>(GetClassNameStatic()); + +public: + virtual TString GetClassName() const override { + return GetClassNameStatic(); + } +}; + +} // namespace NKikimr::NOlap::NStorageOptimizer::NLCBuckets diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/selector/ya.make b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/selector/ya.make new file mode 100644 index 00000000000..874ee0a40e2 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/selector/ya.make @@ -0,0 +1,17 @@ +LIBRARY() + +SRCS( + constructor.cpp + GLOBAL empty.cpp + GLOBAL transparent.cpp + GLOBAL snapshot.cpp +) + +PEERDIR( + contrib/libs/apache/arrow + ydb/core/protos + ydb/core/formats/arrow + ydb/core/tx/columnshard/engines/changes/abstract +) + +END() diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/ya.make b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/ya.make index a06b6cdb0be..eeb97ce4615 100644 --- a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/ya.make +++ b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/ya.make @@ -2,8 +2,6 @@ LIBRARY() SRCS( GLOBAL constructor.cpp - GLOBAL zero_level.cpp - GLOBAL one_layer.cpp ) PEERDIR( @@ -11,6 +9,8 @@ PEERDIR( ydb/core/protos ydb/core/formats/arrow ydb/core/tx/columnshard/engines/changes/abstract + ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/selector + ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/level ) END() diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/accumulation_level.h b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/accumulation_level.h deleted file mode 100644 index 4718a94c93c..00000000000 --- a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/accumulation_level.h +++ /dev/null @@ -1,115 +0,0 @@ -#pragma once -#include "abstract.h" -#include "counters.h" - -namespace NKikimr::NOlap::NStorageOptimizer::NLCBuckets { - -class TAccumulationLevelPortions: public IPortionsLevel { -private: - using TBase = IPortionsLevel; - const TLevelCounters LevelCounters; - - std::set<TOrderedPortion> Portions; - - virtual std::optional<TPortionsChain> DoGetAffectedPortions( - const NArrow::TSimpleRow& /*from*/, const NArrow::TSimpleRow& /*to*/) const override { - return std::nullopt; - } - - virtual ui64 DoGetAffectedPortionBytes(const NArrow::TSimpleRow& /*from*/, const NArrow::TSimpleRow& /*to*/) const override { - return 0; - } - - virtual ui64 DoGetWeight() const override { - if (PortionsInfo.GetCount() <= 1) { - return 0; - } - - THashSet<ui64> portionIds; - ui64 affectedRawBytes = 0; - auto chain = - NextLevel->GetAffectedPortions(Portions.begin()->GetPortion()->IndexKeyStart(), Portions.rbegin()->GetPortion()->IndexKeyEnd()); - if (chain) { - auto it = Portions.begin(); - auto itNext = chain->GetPortions().begin(); - while (it != Portions.end() && itNext != chain->GetPortions().end()) { - const auto& nextLevelPortion = *itNext; - if (nextLevelPortion->IndexKeyEnd() < it->GetPortion()->IndexKeyStart()) { - ++itNext; - } else if (it->GetPortion()->IndexKeyEnd() < nextLevelPortion->IndexKeyStart()) { - ++it; - } else { - if (portionIds.emplace(nextLevelPortion->GetPortionId()).second) { - affectedRawBytes += nextLevelPortion->GetTotalRawBytes(); - } - ++itNext; - } - } - } - const ui64 mb = ((affectedRawBytes + PortionsInfo.GetRawBytes()) >> 20) + 1; - return 1000.0 * PortionsInfo.GetCount() * PortionsInfo.GetCount() / mb; - } - - virtual TInstant DoGetWeightExpirationInstant() const override { - return TInstant::Max(); - } - -public: - TAccumulationLevelPortions(const ui64 levelId, const std::shared_ptr<IPortionsLevel>& nextLevel, const TLevelCounters& levelCounters) - : TBase(levelId, nextLevel) - , LevelCounters(levelCounters) { - } - - virtual bool IsLocked(const std::shared_ptr<NDataLocks::TManager>& locksManager) const override { - for (auto&& i : Portions) { - if (locksManager->IsLocked(*i.GetPortion(), NDataLocks::ELockCategory::Compaction)) { - return true; - } - } - return false; - } - - virtual void DoModifyPortions(const std::vector<TPortionInfo::TPtr>& add, const std::vector<TPortionInfo::TPtr>& remove) override { - for (auto&& i : remove) { - auto it = Portions.find(i); - AFL_VERIFY(it != Portions.end()); - AFL_VERIFY(it->GetPortion()->GetPortionId() == i->GetPortionId()); - PortionsInfo.RemovePortion(i); - Portions.erase(it); - LevelCounters.Portions->RemovePortion(i); - } - for (auto&& i : add) { - AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "add_accum")("portion_id", i->GetPortionId())( - "blob_size", i->GetTotalBlobBytes()); - AFL_VERIFY(Portions.emplace(i).second); - PortionsInfo.AddPortion(i); - LevelCounters.Portions->AddPortion(i); - } - } - - virtual TCompactionTaskData DoGetOptimizationTask() const override { - AFL_VERIFY(Portions.size()); - std::shared_ptr<IPortionsLevel> targetLevel = GetNextLevel(); - AFL_VERIFY(targetLevel); - TCompactionTaskData result(targetLevel->GetLevelId()); - { - for (auto&& i : Portions) { - result.AddCurrentLevelPortion( - i.GetPortion(), targetLevel->GetAffectedPortions(i.GetPortion()->IndexKeyStart(), i.GetPortion()->IndexKeyEnd()), true); - if (!result.CanTakeMore()) { - result.SetStopSeparation(i.GetPortion()->IndexKeyStart()); - break; - } - } - } - return result; - } - - virtual NArrow::NMerger::TIntervalPositions DoGetBucketPositions(const std::shared_ptr<arrow::Schema>& /*pkSchema*/) const override { - AFL_VERIFY(false); - NArrow::NMerger::TIntervalPositions result; - return result; - } -}; - -} // namespace NKikimr::NOlap::NStorageOptimizer::NLCBuckets diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/abstract.cpp b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/level/abstract.cpp index 84e224f68a5..84e224f68a5 100644 --- a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/abstract.cpp +++ b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/level/abstract.cpp diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/abstract.h b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/level/abstract.h index 401de122d90..1a5579c3df4 100644 --- a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/abstract.h +++ b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/level/abstract.h @@ -3,8 +3,10 @@ #include <ydb/core/tx/columnshard/engines/changes/general_compaction.h> #include <ydb/core/tx/columnshard/engines/portions/portion_info.h> +#include <ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/selector/abstract.h> #include <ydb/library/formats/arrow/replace_key.h> +#include <ydb/services/bg_tasks/abstract/interface.h> namespace NKikimr::NOlap::NStorageOptimizer::NLCBuckets { @@ -12,8 +14,6 @@ class TOrderedPortion { private: TPortionInfo::TConstPtr Portion; NArrow::TSimpleRow Start; - ui64 PortionId; - NArrow::TSimpleRow StartPosition; public: const TPortionInfo::TConstPtr& GetPortion() const { @@ -25,23 +25,14 @@ public: return Start; } - const NArrow::TSimpleRow& GetStartPosition() const { - AFL_VERIFY(Portion); - return StartPosition; - } - TOrderedPortion(const TPortionInfo::TConstPtr& portion) : Portion(portion) - , Start(portion->IndexKeyStart()) - , PortionId(portion->GetPortionId()) - , StartPosition(Portion->IndexKeyStart()) { + , Start(portion->IndexKeyStart()) { } TOrderedPortion(const TPortionInfo::TPtr& portion) : Portion(portion) - , Start(portion->IndexKeyStart()) - , PortionId(portion->GetPortionId()) - , StartPosition(Portion->IndexKeyStart()) { + , Start(portion->IndexKeyStart()) { } friend bool operator<(const NArrow::TSimpleRow& item, const TOrderedPortion& portion) { @@ -63,13 +54,19 @@ public: } bool operator<(const TOrderedPortion& item) const { + AFL_VERIFY(Portion->GetPathId() == item.Portion->GetPathId()); auto cmp = Start.CompareNotNull(item.Start); if (cmp == std::partial_ordering::equivalent) { - return PortionId < item.PortionId; + return Portion->GetPortionId() < item.Portion->GetPortionId(); } else { return cmp == std::partial_ordering::less; } } + + bool operator==(const TOrderedPortion& item) const { + AFL_VERIFY(Portion->GetPathId() == item.Portion->GetPathId()); + return Portion->GetPortionId() == item.Portion->GetPortionId(); + } }; class TChainAddress { @@ -282,6 +279,46 @@ public: } }; +class IOverloadChecker { +private: + virtual bool DoIsOverloaded(const TSimplePortionsGroupInfo& portionsData) const = 0; + +public: + virtual ~IOverloadChecker() = default; + + bool IsOverloaded(const TSimplePortionsGroupInfo& portionsData) const { + return DoIsOverloaded(portionsData); + } +}; + +class TNoOverloadChecker: public IOverloadChecker { +private: + virtual bool DoIsOverloaded(const TSimplePortionsGroupInfo& /*portionsData*/) const override { + return false; + } +}; + +class TLimitsOverloadChecker: public IOverloadChecker { +private: + const std::optional<ui64> PortionsCountLimit; + const std::optional<ui64> PortionBlobsSizeLimit; + virtual bool DoIsOverloaded(const TSimplePortionsGroupInfo& portionsData) const override { + if (PortionsCountLimit && *PortionsCountLimit < (ui64)portionsData.GetCount()) { + return true; + } + if (PortionBlobsSizeLimit && *PortionBlobsSizeLimit < (ui64)portionsData.GetBlobBytes()) { + return true; + } + return false; + } + +public: + TLimitsOverloadChecker(const std::optional<ui64> portionsCountLimit, const std::optional<ui64> portionBlobsSizeLimit) + : PortionsCountLimit(portionsCountLimit) + , PortionBlobsSizeLimit(portionBlobsSizeLimit) { + } +}; + class IPortionsLevel { private: virtual void DoModifyPortions(const std::vector<TPortionInfo::TPtr>& add, const std::vector<TPortionInfo::TPtr>& remove) = 0; @@ -301,24 +338,48 @@ private: } YDB_READONLY(ui64, LevelId, 0); + std::vector<std::shared_ptr<IPortionsSelector>> Selectors; + std::vector<TSimplePortionsGroupInfo> SelectivePortionsInfo; + + TSimplePortionsGroupInfo* PortionsInfo = nullptr; + std::shared_ptr<IOverloadChecker> OverloadChecker = std::make_shared<TNoOverloadChecker>(); + std::shared_ptr<IPortionsSelector> DefaultPortionsSelector; + const TLevelCounters LevelCounters; protected: std::shared_ptr<IPortionsLevel> NextLevel; - TSimplePortionsGroupInfo PortionsInfo; mutable std::optional<TInstant> PredOptimization = TInstant::Now(); public: - virtual bool IsOverloaded() const { + virtual bool IsAppropriatePortionToMove(const TPortionAccessorConstructor& /*info*/) const { return false; } + virtual bool IsAppropriatePortionToStore(const TPortionAccessorConstructor& /*info*/) const { + return false; + } + + const TSimplePortionsGroupInfo& GetPortionsInfo() const { + AFL_VERIFY(PortionsInfo); + return *PortionsInfo; + } + + TSimplePortionsGroupInfo& MutablePortionsInfo() { + AFL_VERIFY(PortionsInfo); + return *PortionsInfo; + } + + bool IsOverloaded() const { + return OverloadChecker->IsOverloaded(GetPortionsInfo()); + } + bool HasData() const { - return PortionsInfo.GetCount(); + return GetPortionsInfo().GetCount(); } virtual std::optional<double> GetPackKff() const { - if (PortionsInfo.GetRawBytes()) { - return 1.0 * PortionsInfo.GetBlobBytes() / PortionsInfo.GetRawBytes(); + if (GetPortionsInfo().GetRawBytes()) { + return 1.0 * GetPortionsInfo().GetBlobBytes() / GetPortionsInfo().GetRawBytes(); } else if (!NextLevel) { return std::nullopt; } else { @@ -326,18 +387,30 @@ public: } } - const TSimplePortionsGroupInfo& GetPortionsInfo() const { - return PortionsInfo; - } - const std::shared_ptr<IPortionsLevel>& GetNextLevel() const { return NextLevel; } virtual ~IPortionsLevel() = default; - IPortionsLevel(const ui64 levelId, const std::shared_ptr<IPortionsLevel>& nextLevel) + IPortionsLevel(const ui64 levelId, const std::shared_ptr<IPortionsLevel>& nextLevel, + const std::shared_ptr<IOverloadChecker>& overloadChecker, const TLevelCounters levelCounters, + const std::vector<std::shared_ptr<IPortionsSelector>>& selectors, const TString& defaultSelectorName) : LevelId(levelId) + , Selectors(selectors) + , OverloadChecker(overloadChecker ? overloadChecker : std::make_shared<TNoOverloadChecker>()) + , LevelCounters(levelCounters) , NextLevel(nextLevel) { + SelectivePortionsInfo.resize(Selectors.size()); + ui32 idx = 0; + for (auto&& i : selectors) { + if (i->GetName() == defaultSelectorName) { + AFL_VERIFY(!DefaultPortionsSelector); + DefaultPortionsSelector = i; + PortionsInfo = &SelectivePortionsInfo[idx]; + } + ++idx; + } + AFL_VERIFY(DefaultPortionsSelector); } bool CanTakePortion(const TPortionInfo::TConstPtr& portion) const { @@ -361,8 +434,14 @@ public: NJson::TJsonValue result = NJson::JSON_MAP; result.InsertValue("level", LevelId); result.InsertValue("weight", GetWeight()); - result.InsertValue("portions", PortionsInfo.SerializeToJson()); result.InsertValue("details", DoSerializeToJson()); + auto& selectiveJson = result.InsertValue("selectivity", NJson::JSON_MAP); + ui32 idx = 0; + for (auto&& i : SelectivePortionsInfo) { + selectiveJson.InsertValue(Selectors[idx]->GetName(), i.SerializeToJson()); + ++idx; + } + return result; } @@ -379,7 +458,34 @@ public: } void ModifyPortions(const std::vector<TPortionInfo::TPtr>& add, const std::vector<TPortionInfo::TPtr>& remove) { - return DoModifyPortions(add, remove); + std::vector<TPortionInfo::TPtr> addSelective; + std::vector<TPortionInfo::TPtr> removeSelective; + for (ui32 idx = 0; idx < Selectors.size(); ++idx) { + const std::shared_ptr<IPortionsSelector>& selector = Selectors[idx]; + const bool isDefaultSelector = ((ui64)selector.get() == (ui64)DefaultPortionsSelector.get()); + for (auto&& i : remove) { + if (selector && !selector->IsAppropriate(i)) { + continue; + } + if (isDefaultSelector) { + removeSelective.emplace_back(i); + LevelCounters.Portions->RemovePortion(i); + } + SelectivePortionsInfo[idx].RemovePortion(*i); + } + for (auto&& i : add) { + if (selector && !selector->IsAppropriate(i)) { + continue; + } + if (isDefaultSelector) { + addSelective.emplace_back(i); + LevelCounters.Portions->AddPortion(i); + } + SelectivePortionsInfo[idx].AddPortion(*i); + i->InitRuntimeFeature(TPortionInfo::ERuntimeFeature::Optimized, !NextLevel); + } + } + return DoModifyPortions(addSelective, removeSelective); } ui64 GetWeight() const { diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/common_level.cpp b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/level/common_level.cpp index 6acb8399e42..490c4ed0743 100644 --- a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/common_level.cpp +++ b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/level/common_level.cpp @@ -7,17 +7,13 @@ void TOneLayerPortions::DoModifyPortions(const std::vector<TPortionInfo::TPtr>& auto it = Portions.find(TOrderedPortion(i)); AFL_VERIFY(it != Portions.end()); AFL_VERIFY(it->GetPortion()->GetPortionId() == i->GetPortionId()); - PortionsInfo.RemovePortion(i); Portions.erase(it); - LevelCounters.Portions->RemovePortion(i); } TStringBuilder sb; for (auto&& i : add) { sb << i->GetPortionId() << ","; auto info = Portions.emplace(i); - i->AddRuntimeFeature(TPortionInfo::ERuntimeFeature::Optimized); AFL_VERIFY(info.second); - PortionsInfo.AddPortion(i); if (StrictOneLayer) { { auto it = info.first; @@ -41,7 +37,6 @@ void TOneLayerPortions::DoModifyPortions(const std::vector<TPortionInfo::TPtr>& } } } - LevelCounters.Portions->AddPortion(i); } } @@ -55,7 +50,7 @@ TCompactionTaskData TOneLayerPortions::DoGetOptimizationTask() const { if (itFwd != Portions.begin()) { --itBkwd; } - while (GetLevelBlobBytesLimit() * 0.5 + compactedData < (ui64)PortionsInfo.GetBlobBytes() && + while (GetLevelBlobBytesLimit() * 0.5 + compactedData < (ui64)GetPortionsInfo().GetBlobBytes() && (itBkwd != Portions.begin() || itFwd != Portions.end()) && result.CanTakeMore()) { if (itFwd != Portions.end() && (itBkwd == Portions.begin() || itBkwd->GetPortion()->GetTotalBlobBytes() <= itFwd->GetPortion()->GetTotalBlobBytes())) { diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/common_level.h b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/level/common_level.h index 59b2cf69a92..645c3f17380 100644 --- a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/common_level.h +++ b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/level/common_level.h @@ -9,7 +9,6 @@ private: using TBase = IPortionsLevel; std::set<TOrderedPortion, std::less<>> Portions; - const TLevelCounters LevelCounters; const double BytesLimitFraction = 1; const ui64 ExpectedPortionSize = (1 << 20); const ui64 SizeLimitGuarantee = 0; @@ -60,9 +59,9 @@ private: if (!GetNextLevel()) { return 0; } - if ((ui64)PortionsInfo.GetBlobBytes() > GetLevelBlobBytesLimit() && PortionsInfo.GetCount() > 2 && - (ui64)PortionsInfo.GetBlobBytes() > ExpectedPortionSize * 2) { - return ((ui64)GetLevelId() << 48) + PortionsInfo.GetBlobBytes() - GetLevelBlobBytesLimit(); + if ((ui64)GetPortionsInfo().GetBlobBytes() > GetLevelBlobBytesLimit() && GetPortionsInfo().GetCount() > 2 && + (ui64)GetPortionsInfo().GetBlobBytes() > ExpectedPortionSize * 2) { + return ((ui64)GetLevelId() << 48) + GetPortionsInfo().GetBlobBytes() - GetLevelBlobBytesLimit(); } else { return 0; } @@ -75,9 +74,9 @@ private: public: TOneLayerPortions(const ui64 levelId, const double bytesLimitFraction, const ui64 expectedPortionSize, const std::shared_ptr<IPortionsLevel>& nextLevel, const std::shared_ptr<TSimplePortionsGroupInfo>& summaryPortionsInfo, - const TLevelCounters& levelCounters, const ui64 sizeLimitGuarantee, const bool strictOneLayer = true) - : TBase(levelId, nextLevel) - , LevelCounters(levelCounters) + const TLevelCounters& levelCounters, const ui64 sizeLimitGuarantee, const std::vector<std::shared_ptr<IPortionsSelector>>& selectors, + const TString& defaultSelectorName, const bool strictOneLayer = true) + : TBase(levelId, nextLevel, nullptr, levelCounters, selectors, defaultSelectorName) , BytesLimitFraction(bytesLimitFraction) , ExpectedPortionSize(expectedPortionSize) , SizeLimitGuarantee(sizeLimitGuarantee) @@ -136,7 +135,7 @@ public: virtual NArrow::NMerger::TIntervalPositions DoGetBucketPositions(const std::shared_ptr<arrow::Schema>& /*pkSchema*/) const override { NArrow::NMerger::TIntervalPositions result; for (auto&& i : Portions) { - result.AddPosition(i.GetStartPosition().BuildSortablePosition(), false); + result.AddPosition(i.GetStart().BuildSortablePosition(), false); } return result; } diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/counters.cpp b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/level/counters.cpp index a429ee576fd..a429ee576fd 100644 --- a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/counters.cpp +++ b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/level/counters.cpp diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/counters.h b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/level/counters.h index 95d4924b3ec..95d4924b3ec 100644 --- a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/counters.h +++ b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/level/counters.h diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/level/ya.make b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/level/ya.make new file mode 100644 index 00000000000..ac6e3e1f369 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/level/ya.make @@ -0,0 +1,17 @@ +LIBRARY() + +SRCS( + abstract.cpp + zero_level.cpp + common_level.cpp + counters.cpp +) + +PEERDIR( + contrib/libs/apache/arrow + ydb/core/protos + ydb/core/formats/arrow + ydb/core/tx/columnshard/engines/changes/abstract +) + +END() diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/zero_level.cpp b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/level/zero_level.cpp index 78a75619be5..cc008cca2ca 100644 --- a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/zero_level.cpp +++ b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/level/zero_level.cpp @@ -26,7 +26,7 @@ ui64 TZeroLevelPortions::DoGetWeight() const { return 0; } if (PredOptimization && TInstant::Now() - *PredOptimization < DurationToDrop) { - if (PortionsInfo.PredictPackedBlobBytes(GetPackKff()) < ExpectedBlobsSize) { + if (GetPortionsInfo().PredictPackedBlobBytes(GetPackKff()) < ExpectedBlobsSize) { return 0; } } @@ -57,8 +57,8 @@ ui64 TZeroLevelPortions::DoGetWeight() const { } */ - const ui64 mb = (affectedRawBytes + PortionsInfo.GetRawBytes()) / 1000000 + 1; - return 1000.0 * PortionsInfo.GetCount() * PortionsInfo.GetCount() / mb; + const ui64 mb = (affectedRawBytes + GetPortionsInfo().GetRawBytes()) / 1000000 + 1; + return 1000.0 * GetPortionsInfo().GetCount() * GetPortionsInfo().GetCount() / mb; } TInstant TZeroLevelPortions::DoGetWeightExpirationInstant() const { @@ -69,15 +69,13 @@ TInstant TZeroLevelPortions::DoGetWeightExpirationInstant() const { } TZeroLevelPortions::TZeroLevelPortions(const ui32 levelIdx, const std::shared_ptr<IPortionsLevel>& nextLevel, - const TLevelCounters& levelCounters, const TDuration durationToDrop, const ui64 expectedBlobsSize, const ui64 portionsCountAvailable, - const std::optional<ui64> portionsCountLimit, const std::optional<ui64> portionsSizeLimit) - : TBase(levelIdx, nextLevel) - , LevelCounters(levelCounters) + const TLevelCounters& levelCounters, const std::shared_ptr<IOverloadChecker>& overloadChecker, const TDuration durationToDrop, + const ui64 expectedBlobsSize, const ui64 portionsCountAvailable, const std::vector<std::shared_ptr<IPortionsSelector>>& selectors, + const TString& defaultSelectorName) + : TBase(levelIdx, nextLevel, overloadChecker, levelCounters, selectors, defaultSelectorName) , DurationToDrop(durationToDrop) , ExpectedBlobsSize(expectedBlobsSize) - , PortionsCountAvailable(portionsCountAvailable) - , PortionsCountLimit(portionsCountLimit) - , PortionsSizeLimit(portionsSizeLimit) { + , PortionsCountAvailable(portionsCountAvailable) { if (DurationToDrop != TDuration::Max() && PredOptimization) { *PredOptimization -= TDuration::Seconds(RandomNumber<ui32>(DurationToDrop.Seconds())); } diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/zero_level.h b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/level/zero_level.h index 76f626cfd2c..9b8a7dd6435 100644 --- a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/zero_level.h +++ b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/level/zero_level.h @@ -7,50 +7,12 @@ namespace NKikimr::NOlap::NStorageOptimizer::NLCBuckets { class TZeroLevelPortions: public IPortionsLevel { private: using TBase = IPortionsLevel; - const TLevelCounters LevelCounters; const TDuration DurationToDrop; const ui64 ExpectedBlobsSize; const ui64 PortionsCountAvailable; - const std::optional<ui64> PortionsCountLimit; - const std::optional<ui64> PortionsSizeLimit; - class TOrderedPortion { - private: - YDB_READONLY_DEF(TPortionInfo::TConstPtr, Portion); - public: - TOrderedPortion(const TPortionInfo::TConstPtr& portion) - : Portion(portion) { - } - - TOrderedPortion(const TPortionInfo::TPtr& portion) - : Portion(portion) { - } - - bool operator==(const TOrderedPortion& item) const { - return item.Portion->GetPathId() == Portion->GetPathId() && item.Portion->GetPortionId() == Portion->GetPortionId(); - } - - bool operator<(const TOrderedPortion& item) const { - auto cmp = Portion->IndexKeyStart().CompareNotNull(item.Portion->IndexKeyStart()); - if (cmp == std::partial_ordering::equivalent) { - return Portion->GetPortionId() < item.Portion->GetPortionId(); - } else { - return cmp == std::partial_ordering::less; - } - } - }; std::set<TOrderedPortion> Portions; - virtual bool IsOverloaded() const override { - if (PortionsCountLimit && Portions.size() > *PortionsCountLimit) { - return true; - } - if (PortionsSizeLimit && (ui64)PortionsInfo.GetBlobBytes() > (ui64)*PortionsSizeLimit) { - return true; - } - return false; - } - virtual NArrow::NMerger::TIntervalPositions DoGetBucketPositions(const std::shared_ptr<arrow::Schema>& /*pkSchema*/) const override { return NArrow::NMerger::TIntervalPositions(); } @@ -60,6 +22,14 @@ private: return std::nullopt; } + virtual bool IsAppropriatePortionToMove(const TPortionAccessorConstructor& info) const override { + return info.GetTotalBlobsSize() > ExpectedBlobsSize; + } + + virtual bool IsAppropriatePortionToStore(const TPortionAccessorConstructor& /*info*/) const override { + return true; + } + virtual ui64 DoGetAffectedPortionBytes(const NArrow::TSimpleRow& /*from*/, const NArrow::TSimpleRow& /*to*/) const override { return 0; } @@ -80,14 +50,9 @@ private: if (!constructionFlag) { AFL_VERIFY(Portions.emplace(i).second); } - PortionsInfo.AddPortion(i); - LevelCounters.Portions->AddPortion(i); - i->InitRuntimeFeature(TPortionInfo::ERuntimeFeature::Optimized, !NextLevel); } for (auto&& i : remove) { AFL_VERIFY(Portions.erase(i)); - LevelCounters.Portions->RemovePortion(i); - PortionsInfo.RemovePortion(i); } } @@ -107,8 +72,8 @@ private: public: TZeroLevelPortions(const ui32 levelIdx, const std::shared_ptr<IPortionsLevel>& nextLevel, const TLevelCounters& levelCounters, - const TDuration durationToDrop, const ui64 expectedBlobsSize, const ui64 portionsCountAvailable, - const std::optional<ui64> portionsCountLimit, const std::optional<ui64> portionsSizeLimit); + const std::shared_ptr<IOverloadChecker>& overloadChecker, const TDuration durationToDrop, const ui64 expectedBlobsSize, + const ui64 portionsCountAvailable, const std::vector<std::shared_ptr<IPortionsSelector>>& selectors, const TString& defaultSelectorName); }; } // namespace NKikimr::NOlap::NStorageOptimizer::NLCBuckets diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/optimizer.cpp b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/optimizer.cpp index c9552fc719c..7ea35f94253 100644 --- a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/optimizer.cpp +++ b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/optimizer.cpp @@ -1,7 +1,9 @@ -#include "accumulation_level.h" -#include "common_level.h" #include "optimizer.h" -#include "zero_level.h" + +#include "level/common_level.h" +#include "level/zero_level.h" +#include "selector/snapshot.h" +#include "selector/transparent.h" #include <ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/constructor.h> @@ -10,11 +12,22 @@ namespace NKikimr::NOlap::NStorageOptimizer::NLCBuckets { TOptimizerPlanner::TOptimizerPlanner(const TInternalPathId pathId, const std::shared_ptr<IStoragesManager>& storagesManager, - const std::shared_ptr<arrow::Schema>& primaryKeysSchema, const std::vector<TLevelConstructorContainer>& levelConstructors) + const std::shared_ptr<arrow::Schema>& primaryKeysSchema, const std::vector<TLevelConstructorContainer>& levelConstructors, + const std::vector<TSelectorConstructorContainer>& selectors) : TBase(pathId) , Counters(std::make_shared<TCounters>()) , StoragesManager(storagesManager) , PrimaryKeysSchema(primaryKeysSchema) { + { + std::set<TString> selectorNames; + for (auto&& i : selectors) { + AFL_VERIFY(selectorNames.emplace(i->GetName()).second); + Selectors.emplace_back(i->BuildSelector()); + } + if (Selectors.empty()) { + Selectors = { std::make_shared<TTransparentPortionsSelector>("default") }; + } + } std::shared_ptr<IPortionsLevel> nextLevel; /* const ui64 maxPortionBlobBytes = (ui64)1 << 20; @@ -26,17 +39,16 @@ TOptimizerPlanner::TOptimizerPlanner(const TInternalPathId pathId, const std::sh ui32 idx = levelConstructors.size(); for (auto it = levelConstructors.rbegin(); it != levelConstructors.rend(); ++it) { --idx; - Levels.emplace_back((*it)->BuildLevel(nextLevel, idx, PortionsInfo, Counters->GetLevelCounters(idx))); + Levels.emplace_back((*it)->BuildLevel(nextLevel, idx, PortionsInfo, Counters->GetLevelCounters(idx), Selectors)); nextLevel = Levels.back(); } } else { Levels.emplace_back(std::make_shared<TZeroLevelPortions>( - 2, nullptr, Counters->GetLevelCounters(2), TDuration::Max(), 1 << 20, 10, std::nullopt, std::nullopt)); - Levels.emplace_back( - std::make_shared<TZeroLevelPortions>( - 1, Levels.back(), Counters->GetLevelCounters(1), TDuration::Max(), 1 << 20, 10, std::nullopt, std::nullopt)); + 2, nullptr, Counters->GetLevelCounters(2), nullptr, TDuration::Max(), 1 << 20, 10, Selectors, "default")); + Levels.emplace_back(std::make_shared<TZeroLevelPortions>( + 1, Levels.back(), Counters->GetLevelCounters(1), nullptr, TDuration::Max(), 1 << 20, 10, Selectors, "default")); Levels.emplace_back(std::make_shared<TZeroLevelPortions>( - 0, Levels.back(), Counters->GetLevelCounters(0), TDuration::Seconds(180), 1 << 20, 10, std::nullopt, std::nullopt)); + 0, Levels.back(), Counters->GetLevelCounters(0), nullptr, TDuration::Seconds(180), 1 << 20, 10, Selectors, "default")); } std::reverse(Levels.begin(), Levels.end()); RefreshWeights(); diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/optimizer.h b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/optimizer.h index e34f678f896..53212a66045 100644 --- a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/optimizer.h +++ b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/optimizer.h @@ -1,11 +1,13 @@ #pragma once -#include "abstract.h" -#include "counters.h" +#include "level/abstract.h" + #include <ydb/core/tx/columnshard/common/path_id.h> +#include <ydb/core/tx/columnshard/engines/storage/optimizer/abstract/optimizer.h> namespace NKikimr::NOlap::NStorageOptimizer::NLCBuckets { class TLevelConstructorContainer; +class TSelectorConstructorContainer; class TOptimizerPlanner: public IOptimizerPlanner { private: @@ -13,10 +15,22 @@ private: std::shared_ptr<TCounters> Counters; std::shared_ptr<TSimplePortionsGroupInfo> PortionsInfo = std::make_shared<TSimplePortionsGroupInfo>(); + std::vector<std::shared_ptr<IPortionsSelector>> Selectors; std::vector<std::shared_ptr<IPortionsLevel>> Levels; std::map<ui64, std::shared_ptr<IPortionsLevel>, std::greater<ui64>> LevelsByWeight; const std::shared_ptr<IStoragesManager> StoragesManager; const std::shared_ptr<arrow::Schema> PrimaryKeysSchema; + + virtual ui32 GetAppropriateLevel(const ui32 baseLevel, const TPortionAccessorConstructor& info) const override { + ui32 result = baseLevel; + for (ui32 i = baseLevel; i + 1 < Levels.size(); ++i) { + if (Levels[i]->IsAppropriatePortionToMove(info) && Levels[i + 1]->IsAppropriatePortionToStore(info)) { + result = i + 1; + } + } + return result; + } + virtual bool DoIsOverloaded() const override { for (auto&& i : Levels) { if (i->IsOverloaded()) { @@ -51,8 +65,7 @@ protected: return false; } - virtual void DoModifyPortions( - const THashMap<ui64, TPortionInfo::TPtr>& add, const THashMap<ui64, TPortionInfo::TPtr>& remove) override { + virtual void DoModifyPortions(const THashMap<ui64, TPortionInfo::TPtr>& add, const THashMap<ui64, TPortionInfo::TPtr>& remove) override { std::vector<std::vector<TPortionInfo::TPtr>> removePortionsByLevel; removePortionsByLevel.resize(Levels.size()); for (auto&& [_, i] : remove) { @@ -91,7 +104,7 @@ protected: if (i->GetTotalBlobBytes() > 512 * 1024 && i->GetMeta().GetProduced() != NPortion::EProduced::INSERTED) { for (i32 levelIdx = Levels.size() - 1; levelIdx >= 0; --levelIdx) { if (Levels[levelIdx]->CanTakePortion(i)) { - Levels[levelIdx]->ModifyPortions({i}, {}); + Levels[levelIdx]->ModifyPortions({ i }, {}); i->MutableMeta().ResetCompactionLevel(levelIdx); break; } @@ -149,7 +162,8 @@ public: } TOptimizerPlanner(const TInternalPathId pathId, const std::shared_ptr<IStoragesManager>& storagesManager, - const std::shared_ptr<arrow::Schema>& primaryKeysSchema, const std::vector<TLevelConstructorContainer>& levelConstructors); + const std::shared_ptr<arrow::Schema>& primaryKeysSchema, const std::vector<TLevelConstructorContainer>& levelConstructors, + const std::vector<TSelectorConstructorContainer>& selectors); }; } // namespace NKikimr::NOlap::NStorageOptimizer::NLCBuckets diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/accumulation_level.cpp b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/selector/abstract.cpp index fcd7fcbb9bb..de23092161e 100644 --- a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/accumulation_level.cpp +++ b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/selector/abstract.cpp @@ -1,4 +1,4 @@ -#include "accumulation_level.h" +#include "abstract.h" namespace NKikimr::NOlap::NStorageOptimizer::NLCBuckets { diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/selector/abstract.h b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/selector/abstract.h new file mode 100644 index 00000000000..2b240844b17 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/selector/abstract.h @@ -0,0 +1,25 @@ +#pragma once +#include <ydb/core/tx/columnshard/engines/portions/portion_info.h> + +#include <ydb/services/bg_tasks/abstract/interface.h> + +namespace NKikimr::NOlap::NStorageOptimizer::NLCBuckets { + +class IPortionsSelector { +private: + YDB_READONLY_DEF(TString, Name); + virtual bool DoIsAppropriate(const TPortionInfo::TPtr& portionInfo) const = 0; + +public: + virtual ~IPortionsSelector() = default; + + IPortionsSelector(const TString& name) + : Name(name) { + } + + bool IsAppropriate(const TPortionInfo::TPtr& portionInfo) const { + return DoIsAppropriate(portionInfo); + } +}; + +} // namespace NKikimr::NOlap::NStorageOptimizer::NLCBuckets diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/selector/empty.cpp b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/selector/empty.cpp new file mode 100644 index 00000000000..d5706c6ffed --- /dev/null +++ b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/selector/empty.cpp @@ -0,0 +1,5 @@ +#include "empty.h" + +namespace NKikimr::NOlap::NStorageOptimizer::NLCBuckets { + +} diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/selector/empty.h b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/selector/empty.h new file mode 100644 index 00000000000..de4ff29a440 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/selector/empty.h @@ -0,0 +1,17 @@ +#pragma once +#include "abstract.h" + +namespace NKikimr::NOlap::NStorageOptimizer::NLCBuckets { + +class TEmptyPortionsSelector: public IPortionsSelector { +private: + using TBase = IPortionsSelector; + virtual bool DoIsAppropriate(const TPortionInfo::TPtr& /*portionInfo*/) const override { + return false; + } + +public: + using TBase::TBase; +}; + +} // namespace NKikimr::NOlap::NStorageOptimizer::NLCBuckets diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/selector/snapshot.cpp b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/selector/snapshot.cpp new file mode 100644 index 00000000000..666d5deb896 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/selector/snapshot.cpp @@ -0,0 +1,5 @@ +#include "snapshot.h" + +namespace NKikimr::NOlap::NStorageOptimizer::NLCBuckets { + +} diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/selector/snapshot.h b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/selector/snapshot.h new file mode 100644 index 00000000000..5bbab88fbc6 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/selector/snapshot.h @@ -0,0 +1,93 @@ +#pragma once +#include "abstract.h" + +namespace NKikimr::NOlap::NStorageOptimizer::NLCBuckets { + +class TDataSnapshotInterval { +private: + YDB_READONLY_DEF(std::optional<TInstant>, StartInstant); + YDB_READONLY_DEF(std::optional<TInstant>, FinishInstant); + +public: + bool IsEmpty() const { + return !StartInstant && !FinishInstant; + } + + bool operator==(const TDataSnapshotInterval& item) const = default; + + TDataSnapshotInterval() = default; + TDataSnapshotInterval(const std::optional<TInstant>& start, const std::optional<TInstant>& finish) + : StartInstant(start) + , FinishInstant(finish) { + AFL_VERIFY(!StartInstant || !FinishInstant || *StartInstant <= *FinishInstant); + } + + TConclusionStatus DeserializeFromJson(const NJson::TJsonValue& jsonValue) { + if (!jsonValue.IsMap()) { + return TConclusionStatus::Fail("json have to be a map"); + } + if (jsonValue.Has("start_seconds_utc")) { + if (!jsonValue["start_seconds_utc"].IsUInteger()) { + return TConclusionStatus::Fail("json start_seconds_utc value have to be unsigned int"); + } + StartInstant = TInstant::Seconds(jsonValue["start_seconds_utc"].GetUInteger()); + } + if (jsonValue.Has("finish_seconds_utc")) { + if (!jsonValue["finish_seconds_utc"].IsUInteger()) { + return TConclusionStatus::Fail("json finish_seconds_utc value have to be unsigned int"); + } + FinishInstant = TInstant::Seconds(jsonValue["finish_seconds_utc"].GetUInteger()); + } + return TConclusionStatus::Success(); + } + + template <class TProto> + TConclusionStatus DeserializeFromProto(const TProto& proto) { + if (proto.HasStartSecondsUTC()) { + StartInstant = TInstant::Seconds(proto.GetStartSecondsUTC()); + } + if (proto.HasFinishSecondsUTC()) { + FinishInstant = TInstant::Seconds(proto.GetFinishSecondsUTC()); + } + return TConclusionStatus::Success(); + } + + NKikimrSchemeOp::TCompactionSelectorConstructorContainer::TDataSnapshotInterval SerializeToProto() const { + NKikimrSchemeOp::TCompactionSelectorConstructorContainer::TDataSnapshotInterval result; + if (StartInstant) { + result.SetStartSecondsUTC(StartInstant->Seconds()); + } + if (FinishInstant) { + result.SetFinishSecondsUTC(FinishInstant->Seconds()); + } + return result; + } + + bool CheckPortion(const TPortionInfo::TPtr& p) const { + if (StartInstant && p->RecordSnapshotMax().GetPlanInstant() < *StartInstant) { + return false; + } + if (FinishInstant && *FinishInstant <= p->RecordSnapshotMin().GetPlanInstant()) { + return false; + } + return true; + } +}; + +class TSnapshotPortionsSelector: public IPortionsSelector { +private: + using TBase = IPortionsSelector; + const TDataSnapshotInterval DataSnapshotInterval; + + virtual bool DoIsAppropriate(const TPortionInfo::TPtr& portionInfo) const override { + return DataSnapshotInterval.CheckPortion(portionInfo); + } + +public: + TSnapshotPortionsSelector(const TDataSnapshotInterval& dataSnapshotInterval, const TString& name) + : TBase(name) + , DataSnapshotInterval(dataSnapshotInterval) { + } +}; + +} // namespace NKikimr::NOlap::NStorageOptimizer::NLCBuckets diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/selector/transparent.cpp b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/selector/transparent.cpp new file mode 100644 index 00000000000..b58e84aad57 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/selector/transparent.cpp @@ -0,0 +1,5 @@ +#include "transparent.h" + +namespace NKikimr::NOlap::NStorageOptimizer::NLCBuckets { + +} diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/selector/transparent.h b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/selector/transparent.h new file mode 100644 index 00000000000..f5aea9bb2f8 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/selector/transparent.h @@ -0,0 +1,17 @@ +#pragma once +#include "abstract.h" + +namespace NKikimr::NOlap::NStorageOptimizer::NLCBuckets { + +class TTransparentPortionsSelector: public IPortionsSelector { +private: + using TBase = IPortionsSelector; + virtual bool DoIsAppropriate(const TPortionInfo::TPtr& /*portionInfo*/) const override { + return true; + } + +public: + using TBase::TBase; +}; + +} // namespace NKikimr::NOlap::NStorageOptimizer::NLCBuckets diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/selector/ya.make b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/selector/ya.make new file mode 100644 index 00000000000..e12bd5f5db4 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/selector/ya.make @@ -0,0 +1,17 @@ +LIBRARY() + +SRCS( + abstract.cpp + snapshot.cpp + empty.cpp + transparent.cpp +) + +PEERDIR( + contrib/libs/apache/arrow + ydb/core/protos + ydb/core/formats/arrow + ydb/core/tx/columnshard/engines/changes/abstract +) + +END() diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/ya.make b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/ya.make index 7eba1467e8b..3efeb2941fe 100644 --- a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/ya.make +++ b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/ya.make @@ -1,11 +1,7 @@ LIBRARY() SRCS( - abstract.cpp - zero_level.cpp - common_level.cpp GLOBAL optimizer.cpp - counters.cpp ) PEERDIR( @@ -13,6 +9,8 @@ PEERDIR( ydb/core/protos ydb/core/formats/arrow ydb/core/tx/columnshard/engines/changes/abstract + ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/level + ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/selector ) END() |