aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov333 <ivanmorozov@ydb.tech>2025-05-19 19:24:28 +0300
committerGitHub <noreply@github.com>2025-05-19 19:24:28 +0300
commit0ccb7680e0d44713fec49f415247e65f56b9a74c (patch)
treec8b6fd4d3da0298fd19546981b8aa27530d68a9c
parentae7b1d937a8ca2fe77ce76b19eaf17fa9e267925 (diff)
downloadydb-0ccb7680e0d44713fec49f415247e65f56b9a74c.tar.gz
snapshot interval (#18452)
Co-authored-by: ivanmorozov333 <imorozov333@ya.ru>
-rw-r--r--ydb/core/formats/arrow/accessor/abstract/accessor.h2
-rw-r--r--ydb/core/formats/arrow/accessor/sub_columns/accessor.cpp6
-rw-r--r--ydb/core/formats/arrow/accessor/sub_columns/accessor.h4
-rw-r--r--ydb/core/formats/arrow/accessor/sub_columns/constructor.cpp4
-rw-r--r--ydb/core/formats/arrow/accessor/sub_columns/ut/ut_sub_columns.cpp4
-rw-r--r--ydb/core/kqp/ut/olap/kqp_olap_ut.cpp2
-rw-r--r--ydb/core/kqp/ut/olap/optimizer_ut.cpp144
-rw-r--r--ydb/core/protos/flat_scheme_op.proto29
-rw-r--r--ydb/core/tx/columnshard/engines/changes/compaction.cpp5
-rw-r--r--ydb/core/tx/columnshard/engines/changes/with_appended.cpp7
-rw-r--r--ydb/core/tx/columnshard/engines/portions/constructor_accessor.h12
-rw-r--r--ydb/core/tx/columnshard/engines/portions/constructor_meta.h6
-rw-r--r--ydb/core/tx/columnshard/engines/storage/granule/granule.cpp23
-rw-r--r--ydb/core/tx/columnshard/engines/storage/optimizer/abstract/optimizer.h16
-rw-r--r--ydb/core/tx/columnshard/engines/storage/optimizer/lbuckets/constructor/constructor.cpp6
-rw-r--r--ydb/core/tx/columnshard/engines/storage/optimizer/lbuckets/constructor/constructor.h1
-rw-r--r--ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/constructor.cpp71
-rw-r--r--ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/constructor.h55
-rw-r--r--ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/level/constructor.cpp4
-rw-r--r--ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/level/constructor.h81
-rw-r--r--ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/level/one_layer.cpp (renamed from ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/one_layer.cpp)6
-rw-r--r--ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/level/one_layer.h (renamed from ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/one_layer.h)7
-rw-r--r--ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/level/ya.make16
-rw-r--r--ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/level/zero_level.cpp (renamed from ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/zero_level.cpp)41
-rw-r--r--ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/level/zero_level.h (renamed from ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/zero_level.h)9
-rw-r--r--ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/selector/constructor.cpp5
-rw-r--r--ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/selector/constructor.h74
-rw-r--r--ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/selector/empty.cpp26
-rw-r--r--ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/selector/empty.h26
-rw-r--r--ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/selector/snapshot.cpp37
-rw-r--r--ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/selector/snapshot.h30
-rw-r--r--ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/selector/transparent.cpp26
-rw-r--r--ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/selector/transparent.h26
-rw-r--r--ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/selector/ya.make17
-rw-r--r--ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/ya.make4
-rw-r--r--ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/accumulation_level.h115
-rw-r--r--ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/level/abstract.cpp (renamed from ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/abstract.cpp)0
-rw-r--r--ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/level/abstract.h (renamed from ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/abstract.h)158
-rw-r--r--ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/level/common_level.cpp (renamed from ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/common_level.cpp)7
-rw-r--r--ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/level/common_level.h (renamed from ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/common_level.h)15
-rw-r--r--ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/level/counters.cpp (renamed from ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/counters.cpp)0
-rw-r--r--ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/level/counters.h (renamed from ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/counters.h)0
-rw-r--r--ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/level/ya.make17
-rw-r--r--ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/level/zero_level.cpp (renamed from ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/zero_level.cpp)18
-rw-r--r--ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/level/zero_level.h (renamed from ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/zero_level.h)55
-rw-r--r--ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/optimizer.cpp32
-rw-r--r--ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/optimizer.h26
-rw-r--r--ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/selector/abstract.cpp (renamed from ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/accumulation_level.cpp)2
-rw-r--r--ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/selector/abstract.h25
-rw-r--r--ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/selector/empty.cpp5
-rw-r--r--ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/selector/empty.h17
-rw-r--r--ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/selector/snapshot.cpp5
-rw-r--r--ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/selector/snapshot.h93
-rw-r--r--ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/selector/transparent.cpp5
-rw-r--r--ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/selector/transparent.h17
-rw-r--r--ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/selector/ya.make17
-rw-r--r--ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/ya.make6
57 files changed, 1085 insertions, 382 deletions
diff --git a/ydb/core/formats/arrow/accessor/abstract/accessor.h b/ydb/core/formats/arrow/accessor/abstract/accessor.h
index 692c3cd3c33..1c884adc9d3 100644
--- a/ydb/core/formats/arrow/accessor/abstract/accessor.h
+++ b/ydb/core/formats/arrow/accessor/abstract/accessor.h
@@ -367,7 +367,7 @@ public:
for (ui32 currentIndex = 0; currentIndex < arr->GetRecordsCount();) {
arrCurrent = arr->GetArray(arrCurrent, currentIndex, arr);
auto result = actor(arrCurrent->GetArray());
- if (!!result) {
+ if (result) {
return result;
}
currentIndex = currentIndex + arrCurrent->GetArray()->GetRecordsCount();
diff --git a/ydb/core/formats/arrow/accessor/sub_columns/accessor.cpp b/ydb/core/formats/arrow/accessor/sub_columns/accessor.cpp
index f33a04785e6..71c541d93f4 100644
--- a/ydb/core/formats/arrow/accessor/sub_columns/accessor.cpp
+++ b/ydb/core/formats/arrow/accessor/sub_columns/accessor.cpp
@@ -17,9 +17,9 @@
namespace NKikimr::NArrow::NAccessor {
TConclusion<std::shared_ptr<TSubColumnsArray>> TSubColumnsArray::Make(
- const std::shared_ptr<IChunkedArray>& sourceArray, const NSubColumns::TSettings& settings) {
+ const std::shared_ptr<IChunkedArray>& sourceArray, const NSubColumns::TSettings& settings, const std::shared_ptr<arrow::DataType>& columnType) {
AFL_VERIFY(sourceArray);
- NSubColumns::TDataBuilder builder(sourceArray->GetDataType(), settings);
+ NSubColumns::TDataBuilder builder(columnType, settings);
IChunkedArray::TReader reader(sourceArray);
std::vector<std::shared_ptr<arrow::Array>> storage;
for (ui32 i = 0; i < reader.GetRecordsCount();) {
@@ -40,6 +40,7 @@ TSubColumnsArray::TSubColumnsArray(const std::shared_ptr<arrow::DataType>& type,
, ColumnsData(NSubColumns::TColumnsData::BuildEmpty(recordsCount))
, OthersData(NSubColumns::TOthersData::BuildEmpty())
, Settings(settings) {
+ AFL_VERIFY(type->id() == arrow::binary()->id())("type", type->ToString())("error", "currently supported JsonDocument only");
}
TSubColumnsArray::TSubColumnsArray(NSubColumns::TColumnsData&& columns, NSubColumns::TOthersData&& others,
@@ -48,6 +49,7 @@ TSubColumnsArray::TSubColumnsArray(NSubColumns::TColumnsData&& columns, NSubColu
, ColumnsData(std::move(columns))
, OthersData(std::move(others))
, Settings(settings) {
+ AFL_VERIFY(type->id() == arrow::binary()->id())("type", type->ToString())("error", "currently supported JsonDocument only");
}
TString TSubColumnsArray::SerializeToString(const TChunkConstructionData& externalInfo) const {
diff --git a/ydb/core/formats/arrow/accessor/sub_columns/accessor.h b/ydb/core/formats/arrow/accessor/sub_columns/accessor.h
index 2fd9f60329f..6ff2dd1635a 100644
--- a/ydb/core/formats/arrow/accessor/sub_columns/accessor.h
+++ b/ydb/core/formats/arrow/accessor/sub_columns/accessor.h
@@ -101,8 +101,8 @@ public:
TSubColumnsArray(NSubColumns::TColumnsData&& columns, NSubColumns::TOthersData&& others, const std::shared_ptr<arrow::DataType>& type,
const ui32 recordsCount, const NSubColumns::TSettings& settings);
- static TConclusion<std::shared_ptr<TSubColumnsArray>> Make(
- const std::shared_ptr<IChunkedArray>& sourceArray, const NSubColumns::TSettings& settings);
+ static TConclusion<std::shared_ptr<TSubColumnsArray>> Make(const std::shared_ptr<IChunkedArray>& sourceArray,
+ const NSubColumns::TSettings& settings, const std::shared_ptr<arrow::DataType>& columnType);
TSubColumnsArray(const std::shared_ptr<arrow::DataType>& type, const ui32 recordsCount, const NSubColumns::TSettings& settings);
diff --git a/ydb/core/formats/arrow/accessor/sub_columns/constructor.cpp b/ydb/core/formats/arrow/accessor/sub_columns/constructor.cpp
index 900049f303b..94b265cdae9 100644
--- a/ydb/core/formats/arrow/accessor/sub_columns/constructor.cpp
+++ b/ydb/core/formats/arrow/accessor/sub_columns/constructor.cpp
@@ -66,8 +66,8 @@ bool TConstructor::DoDeserializeFromProto(const NKikimrArrowAccessorProto::TCons
}
TConclusion<std::shared_ptr<IChunkedArray>> TConstructor::DoConstruct(
- const std::shared_ptr<IChunkedArray>& originalData, const TChunkConstructionData& /*externalInfo*/) const {
- auto conclusion = NAccessor::TSubColumnsArray::Make(originalData, Settings);
+ const std::shared_ptr<IChunkedArray>& originalData, const TChunkConstructionData& externalInfo) const {
+ auto conclusion = NAccessor::TSubColumnsArray::Make(originalData, Settings, externalInfo.GetColumnType());
if (conclusion.IsFail()) {
return conclusion;
}
diff --git a/ydb/core/formats/arrow/accessor/sub_columns/ut/ut_sub_columns.cpp b/ydb/core/formats/arrow/accessor/sub_columns/ut/ut_sub_columns.cpp
index edeef71ee95..17e5643695d 100644
--- a/ydb/core/formats/arrow/accessor/sub_columns/ut/ut_sub_columns.cpp
+++ b/ydb/core/formats/arrow/accessor/sub_columns/ut/ut_sub_columns.cpp
@@ -71,7 +71,7 @@ Y_UNIT_TEST_SUITE(SubColumnsArrayAccessor) {
++idx;
}
auto bJsonArr = arrBuilder.Finish(jsons.size());
- auto arrData = TSubColumnsArray::Make(bJsonArr, settings).DetachResult();
+ auto arrData = TSubColumnsArray::Make(bJsonArr, settings, bJsonArr->GetDataType()).DetachResult();
Cerr << arrData->DebugJson() << Endl;
AFL_VERIFY(PrintBinaryJsons(arrData->GetChunkedArray()) == R"([[{"a":"1","b":"1","c":"111"},null,{"a1":"2","b":"2","c":"222"},{"a":"3","b":"3","c":"333"},null,{"a":"5","b1":"5"}]])")(
"string", PrintBinaryJsons(arrData->GetChunkedArray()));
@@ -163,7 +163,7 @@ Y_UNIT_TEST_SUITE(SubColumnsArrayAccessor) {
++idx;
}
auto bJsonArr = arrBuilder.Finish(jsons.size());
- auto arrData = TSubColumnsArray::Make(bJsonArr, settings).DetachResult();
+ auto arrData = TSubColumnsArray::Make(bJsonArr, settings, bJsonArr->GetDataType()).DetachResult();
Cerr << arrData->DebugJson() << Endl;
AFL_VERIFY(PrintBinaryJsons(arrData->GetChunkedArray()) == R"([[{"a":"1","b":"1","c":"111"},null,{"a1":"2","b":"2","c":"222"},{"a":"3","b":"3","c":"333"},null,{"a":"5","b1":"5"}]])")(
"string", PrintBinaryJsons(arrData->GetChunkedArray()));
diff --git a/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp b/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp
index 9768e24c060..9283bae146a 100644
--- a/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp
+++ b/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp
@@ -3220,7 +3220,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
auto rows = ExecuteScanQuery(tableClient, R"(
SELECT level, SUM(records_count) as sum_records_count FROM (
--!syntax_v1
- SELECT JSON_VALUE(CAST(`Details` AS JsonDocument), '$.level') as level, CAST(JSON_VALUE(CAST(`Details` AS JsonDocument), '$.portions.records_count') AS Uint64) as records_count, Details
+ SELECT JSON_VALUE(CAST(`Details` AS JsonDocument), '$.level') as level, CAST(JSON_VALUE(CAST(`Details` AS JsonDocument), '$.selectivity.default.records_count') AS Uint64) as records_count, Details
FROM `/Root/olapStore/olapTable/.sys/primary_index_optimizer_stats`
)
GROUP BY level
diff --git a/ydb/core/kqp/ut/olap/optimizer_ut.cpp b/ydb/core/kqp/ut/olap/optimizer_ut.cpp
index b9f87f89a7b..b4d7ac2ac45 100644
--- a/ydb/core/kqp/ut/olap/optimizer_ut.cpp
+++ b/ydb/core/kqp/ut/olap/optimizer_ut.cpp
@@ -16,6 +16,147 @@
namespace NKikimr::NKqp {
Y_UNIT_TEST_SUITE(KqpOlapOptimizer) {
+ Y_UNIT_TEST(SpecialSliceToOneLayer) {
+ auto settings = TKikimrSettings().SetWithSampleTables(false);
+ TKikimrRunner kikimr(settings);
+
+ auto csController = NYDBTest::TControllers::RegisterCSControllerGuard<NYDBTest::NColumnShard::TController>();
+ csController->SetOverridePeriodicWakeupActivationPeriod(TDuration::Seconds(1));
+ csController->SetOverrideLagForCompactionBeforeTierings(TDuration::Seconds(1));
+ csController->SetOverrideMemoryLimitForPortionReading(1e+10);
+ csController->SetOverrideBlobSplitSettings(NOlap::NSplitter::TSplitSettings().SetMaxPortionSize(30000));
+
+ TLocalHelper(kikimr).CreateTestOlapTable("olapTable", "olapStore", 1, 1);
+ auto tableClient = kikimr.GetTableClient();
+
+ {
+ auto alterQuery =
+ R"(ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_OPTIONS, `COMPACTION_PLANNER.CLASS_NAME`=`lc-buckets`, `COMPACTION_PLANNER.FEATURES`=`
+ {"levels" : [{"class_name" : "Zero", "expected_blobs_size" : 20000, "portions_size_limit" : 400000, "portions_count_available" : 2},
+ {"class_name" : "Zero", "expected_blobs_size" : 20000, "portions_count_available" : 1, "default_selector_name" : "slice"},
+ {"class_name" : "OneLayer", "expected_portion_size" : 40000, "size_limit_guarantee" : 100000000, "bytes_limit_fraction" : 1}],
+ "selectors" : [{"class_name" : "Transparent", "name" : "default"}, {"class_name" : "Snapshot", "name" : "slice", "interval" : {"finish_seconds_utc" : 0}}]}`);
+ )";
+ auto session = tableClient.CreateSession().GetValueSync().GetSession();
+ auto alterResult = session.ExecuteSchemeQuery(alterQuery).GetValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(alterResult.GetStatus(), NYdb::EStatus::SUCCESS, alterResult.GetIssues().ToString());
+ }
+
+ for (ui32 i = 0; i < 100; ++i) {
+ WriteTestData(kikimr, "/Root/olapStore/olapTable", 0, i * 1000, 10000);
+ if (i % 10 == 0) {
+ csController->WaitCompactions(TDuration::MilliSeconds(10));
+ }
+ }
+ csController->WaitCompactions(TDuration::Seconds(10));
+
+ {
+ auto it = tableClient
+ .StreamExecuteScanQuery(R"(
+ --!syntax_v1
+ SELECT COUNT(*)
+ FROM `/Root/olapStore/olapTable`
+ )")
+ .GetValueSync();
+ UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString());
+ TString result = StreamResultToYson(it);
+ Cout << result << Endl;
+ CompareYson(result, R"([[109000u;]])");
+ }
+
+ {
+ auto it = tableClient
+ .StreamExecuteScanQuery(R"(
+ --!syntax_v1
+ SELECT CAST(JSON_VALUE(CAST(Details AS JsonDocument), "$.level") AS Uint64) AS LEVEL, CAST(JSON_VALUE(CAST(Details AS JsonDocument), "$.selectivity.default.records_count") AS Uint64) AS RECORDS_COUNT
+ FROM `/Root/olapStore/olapTable/.sys/primary_index_optimizer_stats`
+ ORDER BY LEVEL
+ )")
+ .GetValueSync();
+ UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString());
+
+ auto rows = CollectRows(it);
+ AFL_VERIFY(rows.size() == 3);
+ AFL_VERIFY(0 == GetUint64(rows[0].at("LEVEL")));
+ AFL_VERIFY(GetUint64(rows[0].at("RECORDS_COUNT")) == 0);
+ AFL_VERIFY(1 == GetUint64(rows[1].at("LEVEL")));
+ AFL_VERIFY(GetUint64(rows[1].at("RECORDS_COUNT")) >= 440000);
+ AFL_VERIFY(GetUint64(rows[1].at("RECORDS_COUNT")) <= 550000);
+ AFL_VERIFY(2 == GetUint64(rows[2].at("LEVEL")));
+ AFL_VERIFY(GetUint64(rows[2].at("RECORDS_COUNT")) == 0);
+
+ for (auto&& i : rows) {
+ Cerr << GetUint64(i.at("LEVEL")) << "/" << GetUint64(i.at("RECORDS_COUNT")) << Endl;
+ }
+ }
+
+ {
+ auto alterQuery =
+ TStringBuilder() << Sprintf(
+ R"(ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_OPTIONS, `COMPACTION_PLANNER.CLASS_NAME`=`lc-buckets`, `COMPACTION_PLANNER.FEATURES`=`
+ {"levels" : [{"class_name" : "Zero", "expected_blobs_size" : 20000, "portions_size_limit" : 100000, "portions_count_available" : 1},
+ {"class_name" : "Zero", "expected_blobs_size" : 20000, "portions_size_limit" : 100000, "portions_count_available" : 1, "default_selector_name" : "slice"},
+ {"class_name" : "OneLayer", "expected_portion_size" : 40000, "size_limit_guarantee" : 100000000, "bytes_limit_fraction" : 1}],
+ "selectors" : [{"class_name" : "Snapshot", "name" : "default"}, {"class_name" : "Snapshot", "name" : "slice", "interval" : {"finish_seconds_utc" : %d}}]}`);
+ )",
+ Now().Seconds());
+ auto session = tableClient.CreateSession().GetValueSync().GetSession();
+ auto alterResult = session.ExecuteSchemeQuery(alterQuery).GetValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(alterResult.GetStatus(), NYdb::EStatus::SUCCESS, alterResult.GetIssues().ToString());
+ }
+ csController->WaitCompactions(TDuration::Seconds(10));
+ {
+ auto it = tableClient
+ .StreamExecuteScanQuery(R"(
+ --!syntax_v1
+ SELECT COUNT(*)
+ FROM `/Root/olapStore/olapTable`
+ )")
+ .GetValueSync();
+ UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString());
+ TString result = StreamResultToYson(it);
+ Cout << result << Endl;
+ CompareYson(result, R"([[109000u;]])");
+ }
+
+ {
+ auto it = tableClient
+ .StreamExecuteScanQuery(R"(
+ --!syntax_v1
+ SELECT CAST(JSON_VALUE(CAST(Details AS JsonDocument), "$.level") AS Uint64) AS LEVEL, Details
+ FROM `/Root/olapStore/olapTable/.sys/primary_index_optimizer_stats`
+ ORDER BY LEVEL
+ )")
+ .GetValueSync();
+ UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString());
+ TString result = StreamResultToYson(it);
+ Cerr << result << Endl;
+ }
+
+ {
+ auto it = tableClient
+ .StreamExecuteScanQuery(R"(
+ --!syntax_v1
+ SELECT CAST(JSON_VALUE(CAST(Details AS JsonDocument), "$.level") AS Uint64) AS LEVEL,
+ CAST(JSON_VALUE(CAST(Details AS JsonDocument), "$.selectivity.default.records_count") AS Uint64) AS RECORDS_COUNT_DEFAULT,
+ CAST(JSON_VALUE(CAST(Details AS JsonDocument), "$.selectivity.slice.records_count") AS Uint64) AS RECORDS_COUNT_SLICE
+ FROM `/Root/olapStore/olapTable/.sys/primary_index_optimizer_stats`
+ ORDER BY LEVEL
+ )")
+ .GetValueSync();
+ UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString());
+ auto rows = CollectRows(it);
+ for (auto&& i : rows) {
+ Cerr << GetUint64(i.at("LEVEL")) << "/" << GetUint64(i.at("RECORDS_COUNT_DEFAULT")) << "/" << GetUint64(i.at("RECORDS_COUNT_SLICE")) << Endl;
+ }
+ AFL_VERIFY(0 == GetUint64(rows[0].at("LEVEL")));
+ AFL_VERIFY(GetUint64(rows[0].at("RECORDS_COUNT_DEFAULT")) == 0);
+ AFL_VERIFY(1 == GetUint64(rows[1].at("LEVEL")));
+ AFL_VERIFY(GetUint64(rows[1].at("RECORDS_COUNT_DEFAULT")) == 0);
+ AFL_VERIFY(2 == GetUint64(rows[2].at("LEVEL")));
+ AFL_VERIFY(GetUint64(rows[2].at("RECORDS_COUNT_SLICE")) == 109000);
+ }
+ }
Y_UNIT_TEST(MultiLayersOptimization) {
auto settings = TKikimrSettings().SetWithSampleTables(false);
TKikimrRunner kikimr(settings);
@@ -70,7 +211,7 @@ Y_UNIT_TEST_SUITE(KqpOlapOptimizer) {
auto it = tableClient
.StreamExecuteScanQuery(R"(
--!syntax_v1
- SELECT CAST(JSON_VALUE(CAST(Details AS JsonDocument), "$.level") AS Uint64) AS LEVEL, CAST(JSON_VALUE(CAST(Details AS JsonDocument), "$.portions.blob_bytes") AS Uint64) AS BYTES
+ SELECT CAST(JSON_VALUE(CAST(Details AS JsonDocument), "$.level") AS Uint64) AS LEVEL, CAST(JSON_VALUE(CAST(Details AS JsonDocument), "$.selectivity.default.blob_bytes") AS Uint64) AS BYTES
FROM `/Root/olapStore/olapTable/.sys/primary_index_optimizer_stats`
ORDER BY LEVEL
)")
@@ -82,6 +223,7 @@ Y_UNIT_TEST_SUITE(KqpOlapOptimizer) {
for (auto&& i : rows) {
AFL_VERIFY(levelIdx == GetUint64(i.at("LEVEL")));
AFL_VERIFY(GetUint64(i.at("BYTES")) < maxVal[levelIdx]);
+ Cerr << GetUint64(i.at("LEVEL")) << "/" << GetUint64(i.at("BYTES")) << Endl;
++levelIdx;
}
}
diff --git a/ydb/core/protos/flat_scheme_op.proto b/ydb/core/protos/flat_scheme_op.proto
index 60775127f2f..56db83b2b5a 100644
--- a/ydb/core/protos/flat_scheme_op.proto
+++ b/ydb/core/protos/flat_scheme_op.proto
@@ -521,6 +521,7 @@ message TStorageTierConfig {
message TCompactionLevelConstructorContainer {
optional string ClassName = 1;
+ optional string DefaultSelectorName = 2;
message TZeroLevel {
optional uint32 PortionsLiveDurationSeconds = 1;
@@ -543,6 +544,33 @@ message TCompactionLevelConstructorContainer {
}
+message TCompactionSelectorConstructorContainer {
+ optional string ClassName = 1;
+ optional string Name = 2;
+
+ message TTransparentSelector {
+ }
+
+ message TEmptySelector {
+ }
+
+ message TDataSnapshotInterval {
+ optional uint32 StartSecondsUTC = 1;
+ optional uint32 FinishSecondsUTC = 2;
+ }
+
+ message TSnapshotSelector {
+ optional TDataSnapshotInterval Interval = 1;
+ }
+
+ oneof Implementation {
+ TEmptySelector Empty = 10;
+ TTransparentSelector Transparent = 11;
+ TSnapshotSelector Snapshot = 12;
+ }
+
+}
+
message TCompactionPlannerConstructorContainer {
optional string ClassName = 1;
@@ -557,6 +585,7 @@ message TCompactionPlannerConstructorContainer {
message TLCOptimizer {
repeated TCompactionLevelConstructorContainer Levels = 1;
+ repeated TCompactionSelectorConstructorContainer Selectors = 2;
}
oneof Implementation {
diff --git a/ydb/core/tx/columnshard/engines/changes/compaction.cpp b/ydb/core/tx/columnshard/engines/changes/compaction.cpp
index 61b9e021dba..b86c5ff28ed 100644
--- a/ydb/core/tx/columnshard/engines/changes/compaction.cpp
+++ b/ydb/core/tx/columnshard/engines/changes/compaction.cpp
@@ -25,7 +25,10 @@ void TCompactColumnEngineChanges::DoCompile(TFinalizationContext& context) {
const TPortionMeta::EProduced producedClassResultCompaction = GetResultProducedClass();
for (auto& portionInfo : AppendedPortions) {
- portionInfo.GetPortionConstructor().MutablePortionConstructor().MutableMeta().UpdateRecordsMeta(producedClassResultCompaction);
+ auto& constructor = portionInfo.GetPortionConstructor().MutablePortionConstructor();
+ constructor.MutableMeta().UpdateRecordsMeta(producedClassResultCompaction);
+ constructor.MutableMeta().SetCompactionLevel(GranuleMeta->GetOptimizerPlanner().GetAppropriateLevel(
+ GetPortionsToMove().GetTargetCompactionLevel().value_or(0), portionInfo.GetPortionConstructor()));
}
}
diff --git a/ydb/core/tx/columnshard/engines/changes/with_appended.cpp b/ydb/core/tx/columnshard/engines/changes/with_appended.cpp
index e4b711ca6df..7006793a71c 100644
--- a/ydb/core/tx/columnshard/engines/changes/with_appended.cpp
+++ b/ydb/core/tx/columnshard/engines/changes/with_appended.cpp
@@ -79,15 +79,14 @@ void TChangesWithAppend::DoWriteIndexOnComplete(NColumnShard::TColumnShard* self
void TChangesWithAppend::DoCompile(TFinalizationContext& context) {
AFL_VERIFY(PortionsToRemove.GetSize() + PortionsToMove.GetSize() + AppendedPortions.size() || NoAppendIsCorrect);
for (auto&& i : AppendedPortions) {
- i.GetPortionConstructor().MutablePortionConstructor().SetPortionId(context.NextPortionId());
- i.GetPortionConstructor().MutablePortionConstructor().MutableMeta().SetCompactionLevel(PortionsToMove.GetTargetCompactionLevel().value_or(0));
+ auto& constructor = i.GetPortionConstructor().MutablePortionConstructor();
+ constructor.SetPortionId(context.NextPortionId());
+ constructor.MutableMeta().SetCompactionLevel(GetPortionsToMove().GetTargetCompactionLevel().value_or(0));
}
}
void TChangesWithAppend::DoOnAfterCompile() {
for (auto&& i : AppendedPortions) {
- i.GetPortionConstructor().MutablePortionConstructor().MutableMeta().SetCompactionLevel(
- PortionsToMove.GetTargetCompactionLevel().value_or(0));
i.FinalizePortionConstructor();
}
}
diff --git a/ydb/core/tx/columnshard/engines/portions/constructor_accessor.h b/ydb/core/tx/columnshard/engines/portions/constructor_accessor.h
index ae04f95199e..3eb721d2b94 100644
--- a/ydb/core/tx/columnshard/engines/portions/constructor_accessor.h
+++ b/ydb/core/tx/columnshard/engines/portions/constructor_accessor.h
@@ -144,6 +144,18 @@ public:
}
+ ui64 GetTotalBlobsSize() const {
+ AFL_VERIFY(Records.size());
+ ui64 size = 0;
+ for (auto&& r : Records) {
+ size += r.GetBlobRange().GetSize();
+ }
+ for (auto&& r : Indexes) {
+ size += r.GetDataSize();
+ }
+ return size;
+ }
+
static TPortionAccessorConstructor BuildForRewriteBlobs(const TPortionInfo& portion) {
return TPortionAccessorConstructor(portion.BuildConstructor(true, false));
}
diff --git a/ydb/core/tx/columnshard/engines/portions/constructor_meta.h b/ydb/core/tx/columnshard/engines/portions/constructor_meta.h
index ab222ed7e96..fe7de12fe30 100644
--- a/ydb/core/tx/columnshard/engines/portions/constructor_meta.h
+++ b/ydb/core/tx/columnshard/engines/portions/constructor_meta.h
@@ -37,6 +37,12 @@ public:
TPortionMetaConstructor() = default;
TPortionMetaConstructor(const TPortionMeta& meta, const bool withBlobs);
+ ui64 GetTotalBlobBytes() const {
+ AFL_VERIFY(ColumnBlobBytes);
+ AFL_VERIFY(IndexBlobBytes);
+ return *ColumnBlobBytes + *IndexBlobBytes;
+ }
+
const TBlobRange RestoreBlobRange(const TBlobRangeLink16& linkRange) const {
return linkRange.RestoreRange(GetBlobId(linkRange.GetBlobIdxVerified()));
}
diff --git a/ydb/core/tx/columnshard/engines/storage/granule/granule.cpp b/ydb/core/tx/columnshard/engines/storage/granule/granule.cpp
index ac510197dd5..0165bc43857 100644
--- a/ydb/core/tx/columnshard/engines/storage/granule/granule.cpp
+++ b/ydb/core/tx/columnshard/engines/storage/granule/granule.cpp
@@ -13,14 +13,10 @@
namespace NKikimr::NOlap {
-
-
void TGranuleMeta::AppendPortion(const std::shared_ptr<TPortionInfo>& info) {
- AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("event", "upsert_portion")("portion", info->DebugString())(
- "path_id", GetPathId());
+ AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("event", "upsert_portion")("portion", info->DebugString())("path_id", GetPathId());
AFL_VERIFY(!Portions.contains(info->GetPortionId()));
- AFL_VERIFY(info->GetPathId() == GetPathId())("event", "incompatible_granule")("portion", info->DebugString())(
- "path_id", GetPathId());
+ AFL_VERIFY(info->GetPathId() == GetPathId())("event", "incompatible_granule")("portion", info->DebugString())("path_id", GetPathId());
AFL_VERIFY(info->ValidSnapshotInfo())("event", "incorrect_portion_snapshots")("portion", info->DebugString());
@@ -133,8 +129,8 @@ const NKikimr::NOlap::TGranuleAdditiveSummary& TGranuleMeta::GetAdditiveSummary(
return *AdditiveSummaryCache;
}
-TGranuleMeta::TGranuleMeta(
- const TInternalPathId pathId, const TGranulesStorage& owner, const NColumnShard::TGranuleDataCounters& counters, const TVersionedIndex& versionedIndex)
+TGranuleMeta::TGranuleMeta(const TInternalPathId pathId, const TGranulesStorage& owner, const NColumnShard::TGranuleDataCounters& counters,
+ const TVersionedIndex& versionedIndex)
: PathId(pathId)
, DataAccessorsManager(owner.GetDataAccessorsManager())
, Counters(counters)
@@ -235,11 +231,12 @@ std::shared_ptr<NKikimr::ITxReader> TGranuleMeta::BuildLoader(
bool TGranuleMeta::TestingLoad(IDbWrapper& db, const TVersionedIndex& versionedIndex) {
TInGranuleConstructors constructors;
{
- if (!db.LoadPortions(PathId, [&](std::unique_ptr<TPortionInfoConstructor>&& portion, const NKikimrTxColumnShard::TIndexPortionMeta& metaProto) {
- const TIndexInfo& indexInfo = portion->GetSchema(versionedIndex)->GetIndexInfo();
- AFL_VERIFY(portion->MutableMeta().LoadMetadata(metaProto, indexInfo, db.GetDsGroupSelectorVerified()));
- AFL_VERIFY(constructors.AddConstructorVerified(std::move(portion)));
- })) {
+ if (!db.LoadPortions(
+ PathId, [&](std::unique_ptr<TPortionInfoConstructor>&& portion, const NKikimrTxColumnShard::TIndexPortionMeta& metaProto) {
+ const TIndexInfo& indexInfo = portion->GetSchema(versionedIndex)->GetIndexInfo();
+ AFL_VERIFY(portion->MutableMeta().LoadMetadata(metaProto, indexInfo, db.GetDsGroupSelectorVerified()));
+ AFL_VERIFY(constructors.AddConstructorVerified(std::move(portion)));
+ })) {
return false;
}
}
diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/abstract/optimizer.h b/ydb/core/tx/columnshard/engines/storage/optimizer/abstract/optimizer.h
index 20c68212068..62345cf2484 100644
--- a/ydb/core/tx/columnshard/engines/storage/optimizer/abstract/optimizer.h
+++ b/ydb/core/tx/columnshard/engines/storage/optimizer/abstract/optimizer.h
@@ -14,6 +14,7 @@ class TColumnEngineChanges;
class IStoragesManager;
class TGranuleMeta;
class TPortionInfo;
+class TPortionAccessorConstructor;
namespace NDataLocks {
class TManager;
}
@@ -87,7 +88,6 @@ private:
virtual bool DoIsOverloaded() const {
return false;
}
-
protected:
virtual void DoModifyPortions(const THashMap<ui64, std::shared_ptr<TPortionInfo>>& add,
const THashMap<ui64, std::shared_ptr<TPortionInfo>>& remove) = 0;
@@ -108,6 +108,10 @@ protected:
}
public:
+ virtual ui32 GetAppropriateLevel(const ui32 baseLevel, const TPortionAccessorConstructor& /*info*/) const {
+ return baseLevel;
+ }
+
IOptimizerPlanner(const TInternalPathId pathId)
: PathId(pathId) {
}
@@ -195,7 +199,6 @@ private:
virtual TConclusion<std::shared_ptr<IOptimizerPlanner>> DoBuildPlanner(const TBuildContext& context) const = 0;
virtual void DoSerializeToProto(TProto& proto) const = 0;
virtual bool DoDeserializeFromProto(const TProto& proto) = 0;
- virtual bool DoIsEqualTo(const IOptimizerPlannerConstructor& item) const = 0;
virtual TConclusionStatus DoDeserializeFromJson(const NJson::TJsonValue& jsonInfo) = 0;
virtual bool DoApplyToCurrentObject(IOptimizerPlanner& current) const = 0;
@@ -230,10 +233,11 @@ public:
bool IsEqualTo(const std::shared_ptr<IOptimizerPlannerConstructor>& item) const {
AFL_VERIFY(!!item);
- if (GetClassName() != item->GetClassName()) {
- return false;
- }
- return DoIsEqualTo(*item);
+ TProto selfProto;
+ TProto itemProto;
+ SerializeToProto(selfProto);
+ item->SerializeToProto(itemProto);
+ return selfProto.SerializeAsString() == itemProto.SerializeAsString();
}
bool DeserializeFromProto(const TProto& proto) {
diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/lbuckets/constructor/constructor.cpp b/ydb/core/tx/columnshard/engines/storage/optimizer/lbuckets/constructor/constructor.cpp
index 753e30db760..c66583b7647 100644
--- a/ydb/core/tx/columnshard/engines/storage/optimizer/lbuckets/constructor/constructor.cpp
+++ b/ydb/core/tx/columnshard/engines/storage/optimizer/lbuckets/constructor/constructor.cpp
@@ -15,12 +15,6 @@ bool TOptimizerPlannerConstructor::DoApplyToCurrentObject(IOptimizerPlanner& cur
return true;
}
-bool TOptimizerPlannerConstructor::DoIsEqualTo(const IOptimizerPlannerConstructor& item) const {
- const auto* itemClass = dynamic_cast<const TOptimizerPlannerConstructor*>(&item);
- AFL_VERIFY(itemClass);
- return true;
-}
-
void TOptimizerPlannerConstructor::DoSerializeToProto(TProto& proto) const {
*proto.MutableLBuckets() = NKikimrSchemeOp::TCompactionPlannerConstructorContainer::TLOptimizer();
}
diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/lbuckets/constructor/constructor.h b/ydb/core/tx/columnshard/engines/storage/optimizer/lbuckets/constructor/constructor.h
index 299fb90ae72..4f9d86b0ab0 100644
--- a/ydb/core/tx/columnshard/engines/storage/optimizer/lbuckets/constructor/constructor.h
+++ b/ydb/core/tx/columnshard/engines/storage/optimizer/lbuckets/constructor/constructor.h
@@ -20,7 +20,6 @@ private:
virtual bool DoApplyToCurrentObject(IOptimizerPlanner& current) const override;
virtual TConclusion<std::shared_ptr<IOptimizerPlanner>> DoBuildPlanner(const TBuildContext& context) const override;
- virtual bool DoIsEqualTo(const IOptimizerPlannerConstructor& item) const override;
public:
virtual TString GetClassName() const override {
return GetClassNameStatic();
diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/constructor.cpp b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/constructor.cpp
index 29abc5f4acb..dcf0927a9bf 100644
--- a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/constructor.cpp
+++ b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/constructor.cpp
@@ -1,32 +1,15 @@
#include "constructor.h"
+
#include <ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/optimizer.h>
namespace NKikimr::NOlap::NStorageOptimizer::NLCBuckets {
-NKikimr::TConclusion<std::shared_ptr<NKikimr::NOlap::NStorageOptimizer::IOptimizerPlanner>> TOptimizerPlannerConstructor::DoBuildPlanner(const TBuildContext& context) const {
- return std::make_shared<TOptimizerPlanner>(context.GetPathId(), context.GetStorages(), context.GetPKSchema(), Levels);
+TConclusion<std::shared_ptr<IOptimizerPlanner>> TOptimizerPlannerConstructor::DoBuildPlanner(const TBuildContext& context) const {
+ return std::make_shared<TOptimizerPlanner>(context.GetPathId(), context.GetStorages(), context.GetPKSchema(), Levels, Selectors);
}
-bool TOptimizerPlannerConstructor::DoApplyToCurrentObject(IOptimizerPlanner& current) const {
- auto* itemClass = dynamic_cast<TOptimizerPlanner*>(&current);
- if (!itemClass) {
- return false;
- }
- return true;
-}
-
-bool TOptimizerPlannerConstructor::DoIsEqualTo(const IOptimizerPlannerConstructor& item) const {
- const auto* itemClass = dynamic_cast<const TOptimizerPlannerConstructor*>(&item);
- AFL_VERIFY(itemClass);
- if (Levels.size() != itemClass->Levels.size()) {
- return false;
- }
- for (ui32 i = 0; i < Levels.size(); ++i) {
- if (!Levels[i]->IsEqualTo(*itemClass->Levels[i].GetObjectPtrVerified())) {
- return false;
- }
- }
- return true;
+bool TOptimizerPlannerConstructor::DoApplyToCurrentObject(IOptimizerPlanner& /*current*/) const {
+ return false;
}
void TOptimizerPlannerConstructor::DoSerializeToProto(TProto& proto) const {
@@ -34,6 +17,9 @@ void TOptimizerPlannerConstructor::DoSerializeToProto(TProto& proto) const {
for (auto&& i : Levels) {
*proto.MutableLCBuckets()->AddLevels() = i.SerializeToProto();
}
+ for (auto&& i : Selectors) {
+ *proto.MutableLCBuckets()->AddSelectors() = i.SerializeToProto();
+ }
}
bool TOptimizerPlannerConstructor::DoDeserializeFromProto(const TProto& proto) {
@@ -49,10 +35,42 @@ bool TOptimizerPlannerConstructor::DoDeserializeFromProto(const TProto& proto) {
}
Levels.emplace_back(std::move(lContainer));
}
+ for (auto&& i : proto.GetLCBuckets().GetSelectors()) {
+ TSelectorConstructorContainer container;
+ if (!container.DeserializeFromProto(i)) {
+ AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("error", "cannot parse lc-bucket selector")("proto", i.DebugString());
+ return false;
+ }
+ Selectors.emplace_back(std::move(container));
+ }
return true;
}
NKikimr::TConclusionStatus TOptimizerPlannerConstructor::DoDeserializeFromJson(const NJson::TJsonValue& jsonInfo) {
+ std::set<TString> selectorNames;
+ if (jsonInfo.Has("selectors")) {
+ if (!jsonInfo["selectors"].IsArray()) {
+ return TConclusionStatus::Fail("selectors have to been array in json description");
+ }
+ auto& arr = jsonInfo["selectors"].GetArray();
+ if (!arr.size()) {
+ return TConclusionStatus::Fail("no objects in json array 'selectors'");
+ }
+ for (auto&& i : arr) {
+ const auto className = i["class_name"].GetStringRobust();
+ auto selector = ISelectorConstructor::TFactory::MakeHolder(className);
+ if (!selector) {
+ return TConclusionStatus::Fail("incorrect portions selector class_name: " + className);
+ }
+ if (!selector->DeserializeFromJson(i)) {
+ return TConclusionStatus::Fail("cannot parse portions selector: " + i.GetStringRobust());
+ }
+ Selectors.emplace_back(TSelectorConstructorContainer(std::shared_ptr<ISelectorConstructor>(selector.Release())));
+ if (!selectorNames.emplace(Selectors.back()->GetName()).second) {
+ return TConclusionStatus::Fail("selector name duplication: '" + Selectors.back()->GetName() + "'");
+ }
+ }
+ }
if (!jsonInfo.Has("levels")) {
return TConclusionStatus::Fail("no levels description");
}
@@ -74,6 +92,15 @@ NKikimr::TConclusionStatus TOptimizerPlannerConstructor::DoDeserializeFromJson(c
return TConclusionStatus::Fail("cannot parse level: " + i.GetStringRobust() + "; " + parseConclusion.GetErrorMessage());
}
Levels.emplace_back(TLevelConstructorContainer(std::shared_ptr<ILevelConstructor>(level.Release())));
+ if (selectorNames.empty()) {
+ if (Levels.back()->GetDefaultSelectorName() != "default") {
+ return TConclusionStatus::Fail("incorrect default selector name for level: '" + Levels.back()->GetDefaultSelectorName() + "'");
+ }
+ } else {
+ if (!selectorNames.contains(Levels.back()->GetDefaultSelectorName())) {
+ return TConclusionStatus::Fail("unknown default selector name for level: '" + Levels.back()->GetDefaultSelectorName() + "'");
+ }
+ }
}
return TConclusionStatus::Success();
}
diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/constructor.h b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/constructor.h
index ece83fc6f89..6872fe817b9 100644
--- a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/constructor.h
+++ b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/constructor.h
@@ -1,58 +1,11 @@
#pragma once
+#include "level/constructor.h"
+#include "selector/constructor.h"
+
#include <ydb/core/tx/columnshard/engines/storage/optimizer/abstract/optimizer.h>
-#include <ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/counters.h>
-#include <ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/abstract.h>
namespace NKikimr::NOlap::NStorageOptimizer::NLCBuckets {
-class ILevelConstructor {
-private:
- virtual std::shared_ptr<IPortionsLevel> DoBuildLevel(const std::shared_ptr<IPortionsLevel>& nextLevel, const ui32 indexLevel,
- const std::shared_ptr<TSimplePortionsGroupInfo>& portionsInfo, const TLevelCounters& counters) const = 0;
- virtual TConclusionStatus DoDeserializeFromJson(const NJson::TJsonValue& json) = 0;
- virtual bool DoDeserializeFromProto(const NKikimrSchemeOp::TCompactionLevelConstructorContainer& proto) = 0;
- virtual void DoSerializeToProto(NKikimrSchemeOp::TCompactionLevelConstructorContainer& proto) const = 0;
- virtual bool IsEqualToSameClass(const ILevelConstructor& item) const = 0;
-
-public:
- using TFactory = NObjectFactory::TObjectFactory<ILevelConstructor, TString>;
- using TProto = NKikimrSchemeOp::TCompactionLevelConstructorContainer;
-
- virtual ~ILevelConstructor() = default;
-
- bool IsEqualTo(const ILevelConstructor& item) const {
- if (GetClassName() != item.GetClassName()) {
- return false;
- }
- return IsEqualToSameClass(item);
- }
-
- std::shared_ptr<IPortionsLevel> BuildLevel(const std::shared_ptr<IPortionsLevel>& nextLevel, const ui32 indexLevel,
- const std::shared_ptr<TSimplePortionsGroupInfo>& portionsInfo, const TLevelCounters& counters) const {
- return DoBuildLevel(nextLevel, indexLevel, portionsInfo, counters);
- }
-
- TConclusionStatus DeserializeFromJson(const NJson::TJsonValue& json) {
- return DoDeserializeFromJson(json);
- }
-
- bool DeserializeFromProto(const TProto& proto) {
- return DoDeserializeFromProto(proto);
- }
- void SerializeToProto(NKikimrSchemeOp::TCompactionLevelConstructorContainer& proto) const {
- return DoSerializeToProto(proto);
- }
- virtual TString GetClassName() const = 0;
-};
-
-class TLevelConstructorContainer: public NBackgroundTasks::TInterfaceProtoContainer<ILevelConstructor> {
-private:
- using TBase = NBackgroundTasks::TInterfaceProtoContainer<ILevelConstructor>;
-
-public:
- using TBase::TBase;
-};
-
class TOptimizerPlannerConstructor: public IOptimizerPlannerConstructor {
public:
static TString GetClassNameStatic() {
@@ -61,6 +14,7 @@ public:
private:
std::vector<TLevelConstructorContainer> Levels;
+ std::vector<TSelectorConstructorContainer> Selectors;
static inline const TFactory::TRegistrator<TOptimizerPlannerConstructor> Registrator =
TFactory::TRegistrator<TOptimizerPlannerConstructor>(GetClassNameStatic());
@@ -72,7 +26,6 @@ private:
virtual bool DoApplyToCurrentObject(IOptimizerPlanner& current) const override;
virtual TConclusion<std::shared_ptr<IOptimizerPlanner>> DoBuildPlanner(const TBuildContext& context) const override;
- virtual bool DoIsEqualTo(const IOptimizerPlannerConstructor& item) const override;
public:
virtual TString GetClassName() const override {
diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/level/constructor.cpp b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/level/constructor.cpp
new file mode 100644
index 00000000000..1f9801b0d23
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/level/constructor.cpp
@@ -0,0 +1,4 @@
+#include "constructor.h"
+
+namespace NKikimr::NOlap::NStorageOptimizer::NLCBuckets {
+} // namespace NKikimr::NOlap::NStorageOptimizer::NLCBuckets
diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/level/constructor.h b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/level/constructor.h
new file mode 100644
index 00000000000..0d76ced744a
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/level/constructor.h
@@ -0,0 +1,81 @@
+#pragma once
+#include <ydb/core/protos/flat_scheme_op.pb.h>
+#include <ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/level/abstract.h>
+#include <ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/level/counters.h>
+
+#include <ydb/services/bg_tasks/abstract/interface.h>
+
+#include <library/cpp/json/writer/json_value.h>
+
+namespace NKikimr::NOlap::NStorageOptimizer::NLCBuckets {
+
+class ILevelConstructor {
+private:
+ YDB_READONLY(TString, DefaultSelectorName, "default");
+ virtual std::shared_ptr<IPortionsLevel> DoBuildLevel(const std::shared_ptr<IPortionsLevel>& nextLevel, const ui32 indexLevel,
+ const std::shared_ptr<TSimplePortionsGroupInfo>& portionsInfo, const TLevelCounters& counters,
+ const std::vector<std::shared_ptr<IPortionsSelector>>& selectors) const = 0;
+ virtual TConclusionStatus DoDeserializeFromJson(const NJson::TJsonValue& json) = 0;
+ virtual bool DoDeserializeFromProto(const NKikimrSchemeOp::TCompactionLevelConstructorContainer& proto) = 0;
+ virtual void DoSerializeToProto(NKikimrSchemeOp::TCompactionLevelConstructorContainer& proto) const = 0;
+
+public:
+ using TFactory = NObjectFactory::TObjectFactory<ILevelConstructor, TString>;
+ using TProto = NKikimrSchemeOp::TCompactionLevelConstructorContainer;
+
+ virtual ~ILevelConstructor() = default;
+
+ bool IsEqualTo(const ILevelConstructor& item) const {
+ return SerializeToProto().SerializeAsString() == item.SerializeToProto().SerializeAsString();
+ }
+
+ std::shared_ptr<IPortionsLevel> BuildLevel(const std::shared_ptr<IPortionsLevel>& nextLevel, const ui32 indexLevel,
+ const std::shared_ptr<TSimplePortionsGroupInfo>& portionsInfo, const TLevelCounters& counters,
+ const std::vector<std::shared_ptr<IPortionsSelector>>& selectors) const {
+ return DoBuildLevel(nextLevel, indexLevel, portionsInfo, counters, selectors);
+ }
+
+ TConclusionStatus DeserializeFromJson(const NJson::TJsonValue& json) {
+ if (json.Has("default_selector_name")) {
+ if (!json["default_selector_name"].IsString()) {
+ return TConclusionStatus::Fail("default_selector_name have to be string");
+ }
+ if (!json["default_selector_name"].GetString()) {
+ return TConclusionStatus::Fail("default_selector_name have to be not empty string");
+ }
+ DefaultSelectorName = json["default_selector_name"].GetString();
+ }
+ return DoDeserializeFromJson(json);
+ }
+
+ bool DeserializeFromProto(const TProto& proto) {
+ if (proto.HasDefaultSelectorName()) {
+ DefaultSelectorName = proto.GetDefaultSelectorName();
+ } else {
+ DefaultSelectorName = "default";
+ }
+ return DoDeserializeFromProto(proto);
+ }
+ void SerializeToProto(NKikimrSchemeOp::TCompactionLevelConstructorContainer& proto) const {
+ if (DefaultSelectorName != "default") {
+ proto.SetDefaultSelectorName(DefaultSelectorName);
+ }
+ return DoSerializeToProto(proto);
+ }
+ NKikimrSchemeOp::TCompactionLevelConstructorContainer SerializeToProto() const {
+ NKikimrSchemeOp::TCompactionLevelConstructorContainer result;
+ SerializeToProto(result);
+ return result;
+ }
+ virtual TString GetClassName() const = 0;
+};
+
+class TLevelConstructorContainer: public NBackgroundTasks::TInterfaceProtoContainer<ILevelConstructor> {
+private:
+ using TBase = NBackgroundTasks::TInterfaceProtoContainer<ILevelConstructor>;
+
+public:
+ using TBase::TBase;
+};
+
+} // namespace NKikimr::NOlap::NStorageOptimizer::NLCBuckets
diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/one_layer.cpp b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/level/one_layer.cpp
index bb252c4c5a4..3d80b4de5df 100644
--- a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/one_layer.cpp
+++ b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/level/one_layer.cpp
@@ -1,6 +1,6 @@
#include "one_layer.h"
-#include <ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/common_level.h>
+#include <ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/level/common_level.h>
namespace NKikimr::NOlap::NStorageOptimizer::NLCBuckets {
@@ -65,9 +65,9 @@ void TOneLayerConstructor::DoSerializeToProto(NKikimrSchemeOp::TCompactionLevelC
std::shared_ptr<NStorageOptimizer::NLCBuckets::IPortionsLevel> TOneLayerConstructor::DoBuildLevel(
const std::shared_ptr<IPortionsLevel>& nextLevel, const ui32 indexLevel, const std::shared_ptr<TSimplePortionsGroupInfo>& portionsInfo,
- const TLevelCounters& counters) const {
+ const TLevelCounters& counters, const std::vector<std::shared_ptr<IPortionsSelector>>& selectors) const {
return std::make_shared<TOneLayerPortions>(indexLevel, BytesLimitFraction.value_or(1), ExpectedPortionSize.value_or(2 << 20), nextLevel,
- portionsInfo, counters, SizeLimitGuarantee);
+ portionsInfo, counters, SizeLimitGuarantee, selectors, GetDefaultSelectorName());
}
} // namespace NKikimr::NOlap::NStorageOptimizer::NLCBuckets
diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/one_layer.h b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/level/one_layer.h
index e128962c3f2..37367cb3243 100644
--- a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/one_layer.h
+++ b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/level/one_layer.h
@@ -15,14 +15,11 @@ private:
ui64 SizeLimitGuarantee = 0;
virtual std::shared_ptr<IPortionsLevel> DoBuildLevel(const std::shared_ptr<IPortionsLevel>& nextLevel, const ui32 indexLevel,
- const std::shared_ptr<TSimplePortionsGroupInfo>& portionsInfo, const TLevelCounters& counters) const override;
+ const std::shared_ptr<TSimplePortionsGroupInfo>& portionsInfo, const TLevelCounters& counters,
+ const std::vector<std::shared_ptr<IPortionsSelector>>& selectors) const override;
virtual TConclusionStatus DoDeserializeFromJson(const NJson::TJsonValue& json) override;
virtual bool DoDeserializeFromProto(const NKikimrSchemeOp::TCompactionLevelConstructorContainer& proto) override;
virtual void DoSerializeToProto(NKikimrSchemeOp::TCompactionLevelConstructorContainer& proto) const override;
- virtual bool IsEqualToSameClass(const ILevelConstructor& item) const override {
- const auto& itemCast = dynamic_cast<const TOneLayerConstructor&>(item);
- return BytesLimitFraction == itemCast.BytesLimitFraction && ExpectedPortionSize == itemCast.ExpectedPortionSize;
- }
static const inline TFactory::TRegistrator<TOneLayerConstructor> Registrator =
TFactory::TRegistrator<TOneLayerConstructor>(GetClassNameStatic());
diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/level/ya.make b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/level/ya.make
new file mode 100644
index 00000000000..84d55e6557e
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/level/ya.make
@@ -0,0 +1,16 @@
+LIBRARY()
+
+SRCS(
+ constructor.cpp
+ GLOBAL zero_level.cpp
+ GLOBAL one_layer.cpp
+)
+
+PEERDIR(
+ contrib/libs/apache/arrow
+ ydb/core/protos
+ ydb/core/formats/arrow
+ ydb/core/tx/columnshard/engines/changes/abstract
+)
+
+END()
diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/zero_level.cpp b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/level/zero_level.cpp
index 2ad81f0dbe4..a1c09c28b5f 100644
--- a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/zero_level.cpp
+++ b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/level/zero_level.cpp
@@ -1,6 +1,6 @@
#include "zero_level.h"
-#include <ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/zero_level.h>
+#include <ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/level/zero_level.h>
namespace NKikimr::NOlap::NStorageOptimizer::NLCBuckets {
@@ -54,47 +54,50 @@ bool TZeroLevelConstructor::DoDeserializeFromProto(const NKikimrSchemeOp::TCompa
if (!proto.HasZeroLevel()) {
return true;
}
- if (proto.GetZeroLevel().HasPortionsLiveDurationSeconds()) {
- PortionsLiveDuration = TDuration::Seconds(proto.GetZeroLevel().GetPortionsLiveDurationSeconds());
+ auto& pLevel = proto.GetZeroLevel();
+ if (pLevel.HasPortionsLiveDurationSeconds()) {
+ PortionsLiveDuration = TDuration::Seconds(pLevel.GetPortionsLiveDurationSeconds());
}
- if (proto.GetZeroLevel().HasExpectedBlobsSize()) {
- ExpectedBlobsSize = proto.GetZeroLevel().GetExpectedBlobsSize();
+ if (pLevel.HasExpectedBlobsSize()) {
+ ExpectedBlobsSize = pLevel.GetExpectedBlobsSize();
}
- if (proto.GetZeroLevel().HasPortionsCountAvailable()) {
- PortionsCountAvailable = proto.GetZeroLevel().GetPortionsCountAvailable();
+ if (pLevel.HasPortionsCountAvailable()) {
+ PortionsCountAvailable = pLevel.GetPortionsCountAvailable();
}
- if (proto.GetZeroLevel().HasPortionsCountLimit()) {
- PortionsCountLimit = proto.GetZeroLevel().GetPortionsCountLimit();
+ if (pLevel.HasPortionsCountLimit()) {
+ PortionsCountLimit = pLevel.GetPortionsCountLimit();
}
- if (proto.GetZeroLevel().HasPortionsSizeLimit()) {
- PortionsSizeLimit = proto.GetZeroLevel().GetPortionsSizeLimit();
+ if (pLevel.HasPortionsSizeLimit()) {
+ PortionsSizeLimit = pLevel.GetPortionsSizeLimit();
}
return true;
}
void TZeroLevelConstructor::DoSerializeToProto(NKikimrSchemeOp::TCompactionLevelConstructorContainer& proto) const {
+ auto& mLevel = *proto.MutableZeroLevel();
if (PortionsLiveDuration) {
- proto.MutableZeroLevel()->SetPortionsLiveDurationSeconds(PortionsLiveDuration->Seconds());
+ mLevel.SetPortionsLiveDurationSeconds(PortionsLiveDuration->Seconds());
}
if (ExpectedBlobsSize) {
- proto.MutableZeroLevel()->SetExpectedBlobsSize(*ExpectedBlobsSize);
+ mLevel.SetExpectedBlobsSize(*ExpectedBlobsSize);
}
if (PortionsCountAvailable) {
- proto.MutableZeroLevel()->SetPortionsCountAvailable(*PortionsCountAvailable);
+ mLevel.SetPortionsCountAvailable(*PortionsCountAvailable);
}
if (PortionsCountLimit) {
- proto.MutableZeroLevel()->SetPortionsCountLimit(*PortionsCountLimit);
+ mLevel.SetPortionsCountLimit(*PortionsCountLimit);
}
if (PortionsSizeLimit) {
- proto.MutableZeroLevel()->SetPortionsSizeLimit(*PortionsSizeLimit);
+ mLevel.SetPortionsSizeLimit(*PortionsSizeLimit);
}
}
std::shared_ptr<NKikimr::NOlap::NStorageOptimizer::NLCBuckets::IPortionsLevel> TZeroLevelConstructor::DoBuildLevel(
const std::shared_ptr<IPortionsLevel>& nextLevel, const ui32 indexLevel, const std::shared_ptr<TSimplePortionsGroupInfo>& /*portionsInfo*/,
- const TLevelCounters& counters) const {
- return std::make_shared<TZeroLevelPortions>(indexLevel, nextLevel, counters, PortionsLiveDuration.value_or(TDuration::Max()),
- ExpectedBlobsSize.value_or((ui64)1 << 20), PortionsCountAvailable.value_or(10), PortionsCountLimit, PortionsSizeLimit);
+ const TLevelCounters& counters, const std::vector<std::shared_ptr<IPortionsSelector>>& selectors) const {
+ return std::make_shared<TZeroLevelPortions>(indexLevel, nextLevel, counters,
+ std::make_shared<TLimitsOverloadChecker>(PortionsCountLimit, PortionsSizeLimit), PortionsLiveDuration.value_or(TDuration::Max()),
+ ExpectedBlobsSize.value_or((ui64)1 << 20), PortionsCountAvailable.value_or(10), selectors, GetDefaultSelectorName());
}
} // namespace NKikimr::NOlap::NStorageOptimizer::NLCBuckets
diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/zero_level.h b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/level/zero_level.h
index 45ed3c41a11..4fb8cda439b 100644
--- a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/zero_level.h
+++ b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/level/zero_level.h
@@ -17,16 +17,11 @@ private:
std::optional<ui64> PortionsSizeLimit;
virtual std::shared_ptr<IPortionsLevel> DoBuildLevel(const std::shared_ptr<IPortionsLevel>& nextLevel, const ui32 indexLevel,
- const std::shared_ptr<TSimplePortionsGroupInfo>& portionsInfo, const TLevelCounters& counters) const override;
+ const std::shared_ptr<TSimplePortionsGroupInfo>& portionsInfo, const TLevelCounters& counters,
+ const std::vector<std::shared_ptr<IPortionsSelector>>& selectors) const override;
virtual TConclusionStatus DoDeserializeFromJson(const NJson::TJsonValue& json) override;
virtual bool DoDeserializeFromProto(const NKikimrSchemeOp::TCompactionLevelConstructorContainer& proto) override;
virtual void DoSerializeToProto(NKikimrSchemeOp::TCompactionLevelConstructorContainer& proto) const override;
- virtual bool IsEqualToSameClass(const ILevelConstructor& item) const override {
- const auto& itemCast = dynamic_cast<const TZeroLevelConstructor&>(item);
- return PortionsLiveDuration == itemCast.PortionsLiveDuration && ExpectedBlobsSize == itemCast.ExpectedBlobsSize &&
- PortionsCountAvailable == itemCast.PortionsCountAvailable && PortionsCountLimit == itemCast.PortionsCountLimit &&
- PortionsSizeLimit == itemCast.PortionsSizeLimit;
- }
static const inline TFactory::TRegistrator<TZeroLevelConstructor> Registrator =
TFactory::TRegistrator<TZeroLevelConstructor>(GetClassNameStatic());
diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/selector/constructor.cpp b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/selector/constructor.cpp
new file mode 100644
index 00000000000..2796ac2563e
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/selector/constructor.cpp
@@ -0,0 +1,5 @@
+#include "constructor.h"
+
+namespace NKikimr::NOlap::NStorageOptimizer::NLCBuckets {
+
+} // namespace NKikimr::NOlap::NStorageOptimizer::NLBuckets
diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/selector/constructor.h b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/selector/constructor.h
new file mode 100644
index 00000000000..21c66a399c5
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/selector/constructor.h
@@ -0,0 +1,74 @@
+#pragma once
+#include <ydb/core/protos/flat_scheme_op.pb.h>
+#include <ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/selector/abstract.h>
+
+#include <ydb/services/bg_tasks/abstract/interface.h>
+
+#include <library/cpp/json/writer/json_value.h>
+
+namespace NKikimr::NOlap::NStorageOptimizer::NLCBuckets {
+
+class ISelectorConstructor {
+private:
+ YDB_READONLY_DEF(TString, Name);
+ virtual std::shared_ptr<IPortionsSelector> DoBuildSelector() const = 0;
+ virtual TConclusionStatus DoDeserializeFromJson(const NJson::TJsonValue& json) = 0;
+ virtual bool DoDeserializeFromProto(const NKikimrSchemeOp::TCompactionSelectorConstructorContainer& proto) = 0;
+ virtual void DoSerializeToProto(NKikimrSchemeOp::TCompactionSelectorConstructorContainer& proto) const = 0;
+
+public:
+ using TFactory = NObjectFactory::TObjectFactory<ISelectorConstructor, TString>;
+ using TProto = NKikimrSchemeOp::TCompactionSelectorConstructorContainer;
+
+ virtual ~ISelectorConstructor() = default;
+
+ bool IsEqualTo(const ISelectorConstructor& item) const {
+ return SerializeToProto().SerializeAsString() == item.SerializeToProto().SerializeAsString();
+ }
+
+ std::shared_ptr<IPortionsSelector> BuildSelector() const {
+ return DoBuildSelector();
+ }
+
+ TConclusionStatus DeserializeFromJson(const NJson::TJsonValue& json) {
+ if (json.Has("name")) {
+ if (json["name"].IsString()) {
+ Name = json["name"].GetStringSafe();
+ } else {
+ return TConclusionStatus::Fail("name field have to be string");
+ }
+ } else {
+ Name = "default";
+ }
+ return DoDeserializeFromJson(json);
+ }
+
+ bool DeserializeFromProto(const TProto& proto) {
+ Name = proto.GetName();
+ if (!Name) {
+ AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("event", "cannot parse proto selector constructor")("reason", "empty name");
+ return false;
+ }
+ return DoDeserializeFromProto(proto);
+ }
+ void SerializeToProto(NKikimrSchemeOp::TCompactionSelectorConstructorContainer& proto) const {
+ proto.SetName(Name);
+ return DoSerializeToProto(proto);
+ }
+ NKikimrSchemeOp::TCompactionSelectorConstructorContainer SerializeToProto() const {
+ NKikimrSchemeOp::TCompactionSelectorConstructorContainer result;
+ SerializeToProto(result);
+ return result;
+ }
+ virtual TString GetClassName() const = 0;
+};
+
+class TSelectorConstructorContainer: public NBackgroundTasks::TInterfaceProtoContainer<ISelectorConstructor> {
+private:
+ using TBase = NBackgroundTasks::TInterfaceProtoContainer<ISelectorConstructor>;
+
+public:
+ using TBase::TBase;
+};
+
+} // namespace NKikimr::NOlap::NStorageOptimizer::NLCBuckets
diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/selector/empty.cpp b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/selector/empty.cpp
new file mode 100644
index 00000000000..2e12585987b
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/selector/empty.cpp
@@ -0,0 +1,26 @@
+#include "empty.h"
+
+#include <ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/selector/empty.h>
+
+namespace NKikimr::NOlap::NStorageOptimizer::NLCBuckets {
+
+std::shared_ptr<IPortionsSelector> TEmptySelectorConstructor::DoBuildSelector() const {
+ return std::make_shared<TEmptyPortionsSelector>(GetName());
+}
+
+TConclusionStatus TEmptySelectorConstructor::DoDeserializeFromJson(const NJson::TJsonValue& /*json*/) {
+ return TConclusionStatus::Success();
+}
+
+bool TEmptySelectorConstructor::DoDeserializeFromProto(const NKikimrSchemeOp::TCompactionSelectorConstructorContainer& proto) {
+ if (!proto.HasEmpty()) {
+ return false;
+ }
+ return true;
+}
+
+void TEmptySelectorConstructor::DoSerializeToProto(NKikimrSchemeOp::TCompactionSelectorConstructorContainer& proto) const {
+ *proto.MutableEmpty() = NKikimrSchemeOp::TCompactionSelectorConstructorContainer::TEmptySelector();
+}
+
+} // namespace NKikimr::NOlap::NStorageOptimizer::NLCBuckets
diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/selector/empty.h b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/selector/empty.h
new file mode 100644
index 00000000000..50bb43b9983
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/selector/empty.h
@@ -0,0 +1,26 @@
+#pragma once
+#include "constructor.h"
+
+namespace NKikimr::NOlap::NStorageOptimizer::NLCBuckets {
+
+class TEmptySelectorConstructor: public ISelectorConstructor {
+public:
+ static TString GetClassNameStatic() {
+ return "Empty";
+ }
+
+private:
+ virtual std::shared_ptr<IPortionsSelector> DoBuildSelector() const override;
+ virtual TConclusionStatus DoDeserializeFromJson(const NJson::TJsonValue& json) override;
+ virtual bool DoDeserializeFromProto(const NKikimrSchemeOp::TCompactionSelectorConstructorContainer& proto) override;
+ virtual void DoSerializeToProto(NKikimrSchemeOp::TCompactionSelectorConstructorContainer& proto) const override;
+
+ const static inline auto Registrator = TFactory::TRegistrator<TEmptySelectorConstructor>(GetClassNameStatic());
+
+public:
+ virtual TString GetClassName() const override {
+ return GetClassNameStatic();
+ }
+};
+
+} // namespace NKikimr::NOlap::NStorageOptimizer::NLCBuckets
diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/selector/snapshot.cpp b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/selector/snapshot.cpp
new file mode 100644
index 00000000000..6428d5987a9
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/selector/snapshot.cpp
@@ -0,0 +1,37 @@
+#include "snapshot.h"
+
+#include <ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/selector/snapshot.h>
+
+namespace NKikimr::NOlap::NStorageOptimizer::NLCBuckets {
+
+std::shared_ptr<IPortionsSelector> TSnapshotSelectorConstructor::DoBuildSelector() const {
+ return std::make_shared<TSnapshotPortionsSelector>(Interval, GetName());
+}
+
+TConclusionStatus TSnapshotSelectorConstructor::DoDeserializeFromJson(const NJson::TJsonValue& jsonValue) {
+ if (jsonValue.Has("interval")) {
+ auto conclusion = Interval.DeserializeFromJson(jsonValue["interval"]);
+ if (conclusion.IsFail()) {
+ return conclusion;
+ }
+ }
+ return TConclusionStatus::Success();
+}
+
+bool TSnapshotSelectorConstructor::DoDeserializeFromProto(const NKikimrSchemeOp::TCompactionSelectorConstructorContainer& proto) {
+ if (!proto.HasSnapshot()) {
+ return false;
+ }
+ auto conclusion = Interval.DeserializeFromProto(proto.GetSnapshot().GetInterval());
+ if (conclusion.IsFail()) {
+ AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("event", "cannot parse snapshot selector interval")("reason", conclusion.GetErrorMessage());
+ return false;
+ }
+ return true;
+}
+
+void TSnapshotSelectorConstructor::DoSerializeToProto(NKikimrSchemeOp::TCompactionSelectorConstructorContainer& proto) const {
+ *proto.MutableSnapshot()->MutableInterval() = Interval.SerializeToProto();
+}
+
+} // namespace NKikimr::NOlap::NStorageOptimizer::NLCBuckets
diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/selector/snapshot.h b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/selector/snapshot.h
new file mode 100644
index 00000000000..a5a9b7e4c1f
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/selector/snapshot.h
@@ -0,0 +1,30 @@
+#pragma once
+#include "constructor.h"
+
+#include <ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/selector/snapshot.h>
+
+namespace NKikimr::NOlap::NStorageOptimizer::NLCBuckets {
+
+class TSnapshotSelectorConstructor: public ISelectorConstructor {
+public:
+ static TString GetClassNameStatic() {
+ return "Snapshot";
+ }
+
+private:
+ TDataSnapshotInterval Interval;
+
+ virtual std::shared_ptr<IPortionsSelector> DoBuildSelector() const override;
+ virtual TConclusionStatus DoDeserializeFromJson(const NJson::TJsonValue& json) override;
+ virtual bool DoDeserializeFromProto(const NKikimrSchemeOp::TCompactionSelectorConstructorContainer& proto) override;
+ virtual void DoSerializeToProto(NKikimrSchemeOp::TCompactionSelectorConstructorContainer& proto) const override;
+
+ const static inline auto Registrator = TFactory::TRegistrator<TSnapshotSelectorConstructor>(GetClassNameStatic());
+
+public:
+ virtual TString GetClassName() const override {
+ return GetClassNameStatic();
+ }
+};
+
+} // namespace NKikimr::NOlap::NStorageOptimizer::NLCBuckets
diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/selector/transparent.cpp b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/selector/transparent.cpp
new file mode 100644
index 00000000000..1555456f57a
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/selector/transparent.cpp
@@ -0,0 +1,26 @@
+#include "transparent.h"
+
+#include <ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/selector/transparent.h>
+
+namespace NKikimr::NOlap::NStorageOptimizer::NLCBuckets {
+
+std::shared_ptr<IPortionsSelector> TTransparentSelectorConstructor::DoBuildSelector() const {
+ return std::make_shared<TTransparentPortionsSelector>(GetName());
+}
+
+TConclusionStatus TTransparentSelectorConstructor::DoDeserializeFromJson(const NJson::TJsonValue& /*json*/) {
+ return TConclusionStatus::Success();
+}
+
+bool TTransparentSelectorConstructor::DoDeserializeFromProto(const NKikimrSchemeOp::TCompactionSelectorConstructorContainer& proto) {
+ if (!proto.HasTransparent()) {
+ return false;
+ }
+ return true;
+}
+
+void TTransparentSelectorConstructor::DoSerializeToProto(NKikimrSchemeOp::TCompactionSelectorConstructorContainer& proto) const {
+ *proto.MutableTransparent() = NKikimrSchemeOp::TCompactionSelectorConstructorContainer::TTransparentSelector();
+}
+
+} // namespace NKikimr::NOlap::NStorageOptimizer::NLCBuckets
diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/selector/transparent.h b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/selector/transparent.h
new file mode 100644
index 00000000000..53baadaef13
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/selector/transparent.h
@@ -0,0 +1,26 @@
+#pragma once
+#include "constructor.h"
+
+namespace NKikimr::NOlap::NStorageOptimizer::NLCBuckets {
+
+class TTransparentSelectorConstructor: public ISelectorConstructor {
+public:
+ static TString GetClassNameStatic() {
+ return "Transparent";
+ }
+
+private:
+ virtual std::shared_ptr<IPortionsSelector> DoBuildSelector() const override;
+ virtual TConclusionStatus DoDeserializeFromJson(const NJson::TJsonValue& json) override;
+ virtual bool DoDeserializeFromProto(const NKikimrSchemeOp::TCompactionSelectorConstructorContainer& proto) override;
+ virtual void DoSerializeToProto(NKikimrSchemeOp::TCompactionSelectorConstructorContainer& proto) const override;
+
+ const static inline auto Registrator = TFactory::TRegistrator<TTransparentSelectorConstructor>(GetClassNameStatic());
+
+public:
+ virtual TString GetClassName() const override {
+ return GetClassNameStatic();
+ }
+};
+
+} // namespace NKikimr::NOlap::NStorageOptimizer::NLCBuckets
diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/selector/ya.make b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/selector/ya.make
new file mode 100644
index 00000000000..874ee0a40e2
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/selector/ya.make
@@ -0,0 +1,17 @@
+LIBRARY()
+
+SRCS(
+ constructor.cpp
+ GLOBAL empty.cpp
+ GLOBAL transparent.cpp
+ GLOBAL snapshot.cpp
+)
+
+PEERDIR(
+ contrib/libs/apache/arrow
+ ydb/core/protos
+ ydb/core/formats/arrow
+ ydb/core/tx/columnshard/engines/changes/abstract
+)
+
+END()
diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/ya.make b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/ya.make
index a06b6cdb0be..eeb97ce4615 100644
--- a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/ya.make
+++ b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/ya.make
@@ -2,8 +2,6 @@ LIBRARY()
SRCS(
GLOBAL constructor.cpp
- GLOBAL zero_level.cpp
- GLOBAL one_layer.cpp
)
PEERDIR(
@@ -11,6 +9,8 @@ PEERDIR(
ydb/core/protos
ydb/core/formats/arrow
ydb/core/tx/columnshard/engines/changes/abstract
+ ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/selector
+ ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/level
)
END()
diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/accumulation_level.h b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/accumulation_level.h
deleted file mode 100644
index 4718a94c93c..00000000000
--- a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/accumulation_level.h
+++ /dev/null
@@ -1,115 +0,0 @@
-#pragma once
-#include "abstract.h"
-#include "counters.h"
-
-namespace NKikimr::NOlap::NStorageOptimizer::NLCBuckets {
-
-class TAccumulationLevelPortions: public IPortionsLevel {
-private:
- using TBase = IPortionsLevel;
- const TLevelCounters LevelCounters;
-
- std::set<TOrderedPortion> Portions;
-
- virtual std::optional<TPortionsChain> DoGetAffectedPortions(
- const NArrow::TSimpleRow& /*from*/, const NArrow::TSimpleRow& /*to*/) const override {
- return std::nullopt;
- }
-
- virtual ui64 DoGetAffectedPortionBytes(const NArrow::TSimpleRow& /*from*/, const NArrow::TSimpleRow& /*to*/) const override {
- return 0;
- }
-
- virtual ui64 DoGetWeight() const override {
- if (PortionsInfo.GetCount() <= 1) {
- return 0;
- }
-
- THashSet<ui64> portionIds;
- ui64 affectedRawBytes = 0;
- auto chain =
- NextLevel->GetAffectedPortions(Portions.begin()->GetPortion()->IndexKeyStart(), Portions.rbegin()->GetPortion()->IndexKeyEnd());
- if (chain) {
- auto it = Portions.begin();
- auto itNext = chain->GetPortions().begin();
- while (it != Portions.end() && itNext != chain->GetPortions().end()) {
- const auto& nextLevelPortion = *itNext;
- if (nextLevelPortion->IndexKeyEnd() < it->GetPortion()->IndexKeyStart()) {
- ++itNext;
- } else if (it->GetPortion()->IndexKeyEnd() < nextLevelPortion->IndexKeyStart()) {
- ++it;
- } else {
- if (portionIds.emplace(nextLevelPortion->GetPortionId()).second) {
- affectedRawBytes += nextLevelPortion->GetTotalRawBytes();
- }
- ++itNext;
- }
- }
- }
- const ui64 mb = ((affectedRawBytes + PortionsInfo.GetRawBytes()) >> 20) + 1;
- return 1000.0 * PortionsInfo.GetCount() * PortionsInfo.GetCount() / mb;
- }
-
- virtual TInstant DoGetWeightExpirationInstant() const override {
- return TInstant::Max();
- }
-
-public:
- TAccumulationLevelPortions(const ui64 levelId, const std::shared_ptr<IPortionsLevel>& nextLevel, const TLevelCounters& levelCounters)
- : TBase(levelId, nextLevel)
- , LevelCounters(levelCounters) {
- }
-
- virtual bool IsLocked(const std::shared_ptr<NDataLocks::TManager>& locksManager) const override {
- for (auto&& i : Portions) {
- if (locksManager->IsLocked(*i.GetPortion(), NDataLocks::ELockCategory::Compaction)) {
- return true;
- }
- }
- return false;
- }
-
- virtual void DoModifyPortions(const std::vector<TPortionInfo::TPtr>& add, const std::vector<TPortionInfo::TPtr>& remove) override {
- for (auto&& i : remove) {
- auto it = Portions.find(i);
- AFL_VERIFY(it != Portions.end());
- AFL_VERIFY(it->GetPortion()->GetPortionId() == i->GetPortionId());
- PortionsInfo.RemovePortion(i);
- Portions.erase(it);
- LevelCounters.Portions->RemovePortion(i);
- }
- for (auto&& i : add) {
- AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "add_accum")("portion_id", i->GetPortionId())(
- "blob_size", i->GetTotalBlobBytes());
- AFL_VERIFY(Portions.emplace(i).second);
- PortionsInfo.AddPortion(i);
- LevelCounters.Portions->AddPortion(i);
- }
- }
-
- virtual TCompactionTaskData DoGetOptimizationTask() const override {
- AFL_VERIFY(Portions.size());
- std::shared_ptr<IPortionsLevel> targetLevel = GetNextLevel();
- AFL_VERIFY(targetLevel);
- TCompactionTaskData result(targetLevel->GetLevelId());
- {
- for (auto&& i : Portions) {
- result.AddCurrentLevelPortion(
- i.GetPortion(), targetLevel->GetAffectedPortions(i.GetPortion()->IndexKeyStart(), i.GetPortion()->IndexKeyEnd()), true);
- if (!result.CanTakeMore()) {
- result.SetStopSeparation(i.GetPortion()->IndexKeyStart());
- break;
- }
- }
- }
- return result;
- }
-
- virtual NArrow::NMerger::TIntervalPositions DoGetBucketPositions(const std::shared_ptr<arrow::Schema>& /*pkSchema*/) const override {
- AFL_VERIFY(false);
- NArrow::NMerger::TIntervalPositions result;
- return result;
- }
-};
-
-} // namespace NKikimr::NOlap::NStorageOptimizer::NLCBuckets
diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/abstract.cpp b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/level/abstract.cpp
index 84e224f68a5..84e224f68a5 100644
--- a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/abstract.cpp
+++ b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/level/abstract.cpp
diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/abstract.h b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/level/abstract.h
index 401de122d90..1a5579c3df4 100644
--- a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/abstract.h
+++ b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/level/abstract.h
@@ -3,8 +3,10 @@
#include <ydb/core/tx/columnshard/engines/changes/general_compaction.h>
#include <ydb/core/tx/columnshard/engines/portions/portion_info.h>
+#include <ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/selector/abstract.h>
#include <ydb/library/formats/arrow/replace_key.h>
+#include <ydb/services/bg_tasks/abstract/interface.h>
namespace NKikimr::NOlap::NStorageOptimizer::NLCBuckets {
@@ -12,8 +14,6 @@ class TOrderedPortion {
private:
TPortionInfo::TConstPtr Portion;
NArrow::TSimpleRow Start;
- ui64 PortionId;
- NArrow::TSimpleRow StartPosition;
public:
const TPortionInfo::TConstPtr& GetPortion() const {
@@ -25,23 +25,14 @@ public:
return Start;
}
- const NArrow::TSimpleRow& GetStartPosition() const {
- AFL_VERIFY(Portion);
- return StartPosition;
- }
-
TOrderedPortion(const TPortionInfo::TConstPtr& portion)
: Portion(portion)
- , Start(portion->IndexKeyStart())
- , PortionId(portion->GetPortionId())
- , StartPosition(Portion->IndexKeyStart()) {
+ , Start(portion->IndexKeyStart()) {
}
TOrderedPortion(const TPortionInfo::TPtr& portion)
: Portion(portion)
- , Start(portion->IndexKeyStart())
- , PortionId(portion->GetPortionId())
- , StartPosition(Portion->IndexKeyStart()) {
+ , Start(portion->IndexKeyStart()) {
}
friend bool operator<(const NArrow::TSimpleRow& item, const TOrderedPortion& portion) {
@@ -63,13 +54,19 @@ public:
}
bool operator<(const TOrderedPortion& item) const {
+ AFL_VERIFY(Portion->GetPathId() == item.Portion->GetPathId());
auto cmp = Start.CompareNotNull(item.Start);
if (cmp == std::partial_ordering::equivalent) {
- return PortionId < item.PortionId;
+ return Portion->GetPortionId() < item.Portion->GetPortionId();
} else {
return cmp == std::partial_ordering::less;
}
}
+
+ bool operator==(const TOrderedPortion& item) const {
+ AFL_VERIFY(Portion->GetPathId() == item.Portion->GetPathId());
+ return Portion->GetPortionId() == item.Portion->GetPortionId();
+ }
};
class TChainAddress {
@@ -282,6 +279,46 @@ public:
}
};
+class IOverloadChecker {
+private:
+ virtual bool DoIsOverloaded(const TSimplePortionsGroupInfo& portionsData) const = 0;
+
+public:
+ virtual ~IOverloadChecker() = default;
+
+ bool IsOverloaded(const TSimplePortionsGroupInfo& portionsData) const {
+ return DoIsOverloaded(portionsData);
+ }
+};
+
+class TNoOverloadChecker: public IOverloadChecker {
+private:
+ virtual bool DoIsOverloaded(const TSimplePortionsGroupInfo& /*portionsData*/) const override {
+ return false;
+ }
+};
+
+class TLimitsOverloadChecker: public IOverloadChecker {
+private:
+ const std::optional<ui64> PortionsCountLimit;
+ const std::optional<ui64> PortionBlobsSizeLimit;
+ virtual bool DoIsOverloaded(const TSimplePortionsGroupInfo& portionsData) const override {
+ if (PortionsCountLimit && *PortionsCountLimit < (ui64)portionsData.GetCount()) {
+ return true;
+ }
+ if (PortionBlobsSizeLimit && *PortionBlobsSizeLimit < (ui64)portionsData.GetBlobBytes()) {
+ return true;
+ }
+ return false;
+ }
+
+public:
+ TLimitsOverloadChecker(const std::optional<ui64> portionsCountLimit, const std::optional<ui64> portionBlobsSizeLimit)
+ : PortionsCountLimit(portionsCountLimit)
+ , PortionBlobsSizeLimit(portionBlobsSizeLimit) {
+ }
+};
+
class IPortionsLevel {
private:
virtual void DoModifyPortions(const std::vector<TPortionInfo::TPtr>& add, const std::vector<TPortionInfo::TPtr>& remove) = 0;
@@ -301,24 +338,48 @@ private:
}
YDB_READONLY(ui64, LevelId, 0);
+ std::vector<std::shared_ptr<IPortionsSelector>> Selectors;
+ std::vector<TSimplePortionsGroupInfo> SelectivePortionsInfo;
+
+ TSimplePortionsGroupInfo* PortionsInfo = nullptr;
+ std::shared_ptr<IOverloadChecker> OverloadChecker = std::make_shared<TNoOverloadChecker>();
+ std::shared_ptr<IPortionsSelector> DefaultPortionsSelector;
+ const TLevelCounters LevelCounters;
protected:
std::shared_ptr<IPortionsLevel> NextLevel;
- TSimplePortionsGroupInfo PortionsInfo;
mutable std::optional<TInstant> PredOptimization = TInstant::Now();
public:
- virtual bool IsOverloaded() const {
+ virtual bool IsAppropriatePortionToMove(const TPortionAccessorConstructor& /*info*/) const {
return false;
}
+ virtual bool IsAppropriatePortionToStore(const TPortionAccessorConstructor& /*info*/) const {
+ return false;
+ }
+
+ const TSimplePortionsGroupInfo& GetPortionsInfo() const {
+ AFL_VERIFY(PortionsInfo);
+ return *PortionsInfo;
+ }
+
+ TSimplePortionsGroupInfo& MutablePortionsInfo() {
+ AFL_VERIFY(PortionsInfo);
+ return *PortionsInfo;
+ }
+
+ bool IsOverloaded() const {
+ return OverloadChecker->IsOverloaded(GetPortionsInfo());
+ }
+
bool HasData() const {
- return PortionsInfo.GetCount();
+ return GetPortionsInfo().GetCount();
}
virtual std::optional<double> GetPackKff() const {
- if (PortionsInfo.GetRawBytes()) {
- return 1.0 * PortionsInfo.GetBlobBytes() / PortionsInfo.GetRawBytes();
+ if (GetPortionsInfo().GetRawBytes()) {
+ return 1.0 * GetPortionsInfo().GetBlobBytes() / GetPortionsInfo().GetRawBytes();
} else if (!NextLevel) {
return std::nullopt;
} else {
@@ -326,18 +387,30 @@ public:
}
}
- const TSimplePortionsGroupInfo& GetPortionsInfo() const {
- return PortionsInfo;
- }
-
const std::shared_ptr<IPortionsLevel>& GetNextLevel() const {
return NextLevel;
}
virtual ~IPortionsLevel() = default;
- IPortionsLevel(const ui64 levelId, const std::shared_ptr<IPortionsLevel>& nextLevel)
+ IPortionsLevel(const ui64 levelId, const std::shared_ptr<IPortionsLevel>& nextLevel,
+ const std::shared_ptr<IOverloadChecker>& overloadChecker, const TLevelCounters levelCounters,
+ const std::vector<std::shared_ptr<IPortionsSelector>>& selectors, const TString& defaultSelectorName)
: LevelId(levelId)
+ , Selectors(selectors)
+ , OverloadChecker(overloadChecker ? overloadChecker : std::make_shared<TNoOverloadChecker>())
+ , LevelCounters(levelCounters)
, NextLevel(nextLevel) {
+ SelectivePortionsInfo.resize(Selectors.size());
+ ui32 idx = 0;
+ for (auto&& i : selectors) {
+ if (i->GetName() == defaultSelectorName) {
+ AFL_VERIFY(!DefaultPortionsSelector);
+ DefaultPortionsSelector = i;
+ PortionsInfo = &SelectivePortionsInfo[idx];
+ }
+ ++idx;
+ }
+ AFL_VERIFY(DefaultPortionsSelector);
}
bool CanTakePortion(const TPortionInfo::TConstPtr& portion) const {
@@ -361,8 +434,14 @@ public:
NJson::TJsonValue result = NJson::JSON_MAP;
result.InsertValue("level", LevelId);
result.InsertValue("weight", GetWeight());
- result.InsertValue("portions", PortionsInfo.SerializeToJson());
result.InsertValue("details", DoSerializeToJson());
+ auto& selectiveJson = result.InsertValue("selectivity", NJson::JSON_MAP);
+ ui32 idx = 0;
+ for (auto&& i : SelectivePortionsInfo) {
+ selectiveJson.InsertValue(Selectors[idx]->GetName(), i.SerializeToJson());
+ ++idx;
+ }
+
return result;
}
@@ -379,7 +458,34 @@ public:
}
void ModifyPortions(const std::vector<TPortionInfo::TPtr>& add, const std::vector<TPortionInfo::TPtr>& remove) {
- return DoModifyPortions(add, remove);
+ std::vector<TPortionInfo::TPtr> addSelective;
+ std::vector<TPortionInfo::TPtr> removeSelective;
+ for (ui32 idx = 0; idx < Selectors.size(); ++idx) {
+ const std::shared_ptr<IPortionsSelector>& selector = Selectors[idx];
+ const bool isDefaultSelector = ((ui64)selector.get() == (ui64)DefaultPortionsSelector.get());
+ for (auto&& i : remove) {
+ if (selector && !selector->IsAppropriate(i)) {
+ continue;
+ }
+ if (isDefaultSelector) {
+ removeSelective.emplace_back(i);
+ LevelCounters.Portions->RemovePortion(i);
+ }
+ SelectivePortionsInfo[idx].RemovePortion(*i);
+ }
+ for (auto&& i : add) {
+ if (selector && !selector->IsAppropriate(i)) {
+ continue;
+ }
+ if (isDefaultSelector) {
+ addSelective.emplace_back(i);
+ LevelCounters.Portions->AddPortion(i);
+ }
+ SelectivePortionsInfo[idx].AddPortion(*i);
+ i->InitRuntimeFeature(TPortionInfo::ERuntimeFeature::Optimized, !NextLevel);
+ }
+ }
+ return DoModifyPortions(addSelective, removeSelective);
}
ui64 GetWeight() const {
diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/common_level.cpp b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/level/common_level.cpp
index 6acb8399e42..490c4ed0743 100644
--- a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/common_level.cpp
+++ b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/level/common_level.cpp
@@ -7,17 +7,13 @@ void TOneLayerPortions::DoModifyPortions(const std::vector<TPortionInfo::TPtr>&
auto it = Portions.find(TOrderedPortion(i));
AFL_VERIFY(it != Portions.end());
AFL_VERIFY(it->GetPortion()->GetPortionId() == i->GetPortionId());
- PortionsInfo.RemovePortion(i);
Portions.erase(it);
- LevelCounters.Portions->RemovePortion(i);
}
TStringBuilder sb;
for (auto&& i : add) {
sb << i->GetPortionId() << ",";
auto info = Portions.emplace(i);
- i->AddRuntimeFeature(TPortionInfo::ERuntimeFeature::Optimized);
AFL_VERIFY(info.second);
- PortionsInfo.AddPortion(i);
if (StrictOneLayer) {
{
auto it = info.first;
@@ -41,7 +37,6 @@ void TOneLayerPortions::DoModifyPortions(const std::vector<TPortionInfo::TPtr>&
}
}
}
- LevelCounters.Portions->AddPortion(i);
}
}
@@ -55,7 +50,7 @@ TCompactionTaskData TOneLayerPortions::DoGetOptimizationTask() const {
if (itFwd != Portions.begin()) {
--itBkwd;
}
- while (GetLevelBlobBytesLimit() * 0.5 + compactedData < (ui64)PortionsInfo.GetBlobBytes() &&
+ while (GetLevelBlobBytesLimit() * 0.5 + compactedData < (ui64)GetPortionsInfo().GetBlobBytes() &&
(itBkwd != Portions.begin() || itFwd != Portions.end()) && result.CanTakeMore()) {
if (itFwd != Portions.end() &&
(itBkwd == Portions.begin() || itBkwd->GetPortion()->GetTotalBlobBytes() <= itFwd->GetPortion()->GetTotalBlobBytes())) {
diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/common_level.h b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/level/common_level.h
index 59b2cf69a92..645c3f17380 100644
--- a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/common_level.h
+++ b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/level/common_level.h
@@ -9,7 +9,6 @@ private:
using TBase = IPortionsLevel;
std::set<TOrderedPortion, std::less<>> Portions;
- const TLevelCounters LevelCounters;
const double BytesLimitFraction = 1;
const ui64 ExpectedPortionSize = (1 << 20);
const ui64 SizeLimitGuarantee = 0;
@@ -60,9 +59,9 @@ private:
if (!GetNextLevel()) {
return 0;
}
- if ((ui64)PortionsInfo.GetBlobBytes() > GetLevelBlobBytesLimit() && PortionsInfo.GetCount() > 2 &&
- (ui64)PortionsInfo.GetBlobBytes() > ExpectedPortionSize * 2) {
- return ((ui64)GetLevelId() << 48) + PortionsInfo.GetBlobBytes() - GetLevelBlobBytesLimit();
+ if ((ui64)GetPortionsInfo().GetBlobBytes() > GetLevelBlobBytesLimit() && GetPortionsInfo().GetCount() > 2 &&
+ (ui64)GetPortionsInfo().GetBlobBytes() > ExpectedPortionSize * 2) {
+ return ((ui64)GetLevelId() << 48) + GetPortionsInfo().GetBlobBytes() - GetLevelBlobBytesLimit();
} else {
return 0;
}
@@ -75,9 +74,9 @@ private:
public:
TOneLayerPortions(const ui64 levelId, const double bytesLimitFraction, const ui64 expectedPortionSize,
const std::shared_ptr<IPortionsLevel>& nextLevel, const std::shared_ptr<TSimplePortionsGroupInfo>& summaryPortionsInfo,
- const TLevelCounters& levelCounters, const ui64 sizeLimitGuarantee, const bool strictOneLayer = true)
- : TBase(levelId, nextLevel)
- , LevelCounters(levelCounters)
+ const TLevelCounters& levelCounters, const ui64 sizeLimitGuarantee, const std::vector<std::shared_ptr<IPortionsSelector>>& selectors,
+ const TString& defaultSelectorName, const bool strictOneLayer = true)
+ : TBase(levelId, nextLevel, nullptr, levelCounters, selectors, defaultSelectorName)
, BytesLimitFraction(bytesLimitFraction)
, ExpectedPortionSize(expectedPortionSize)
, SizeLimitGuarantee(sizeLimitGuarantee)
@@ -136,7 +135,7 @@ public:
virtual NArrow::NMerger::TIntervalPositions DoGetBucketPositions(const std::shared_ptr<arrow::Schema>& /*pkSchema*/) const override {
NArrow::NMerger::TIntervalPositions result;
for (auto&& i : Portions) {
- result.AddPosition(i.GetStartPosition().BuildSortablePosition(), false);
+ result.AddPosition(i.GetStart().BuildSortablePosition(), false);
}
return result;
}
diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/counters.cpp b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/level/counters.cpp
index a429ee576fd..a429ee576fd 100644
--- a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/counters.cpp
+++ b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/level/counters.cpp
diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/counters.h b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/level/counters.h
index 95d4924b3ec..95d4924b3ec 100644
--- a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/counters.h
+++ b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/level/counters.h
diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/level/ya.make b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/level/ya.make
new file mode 100644
index 00000000000..ac6e3e1f369
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/level/ya.make
@@ -0,0 +1,17 @@
+LIBRARY()
+
+SRCS(
+ abstract.cpp
+ zero_level.cpp
+ common_level.cpp
+ counters.cpp
+)
+
+PEERDIR(
+ contrib/libs/apache/arrow
+ ydb/core/protos
+ ydb/core/formats/arrow
+ ydb/core/tx/columnshard/engines/changes/abstract
+)
+
+END()
diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/zero_level.cpp b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/level/zero_level.cpp
index 78a75619be5..cc008cca2ca 100644
--- a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/zero_level.cpp
+++ b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/level/zero_level.cpp
@@ -26,7 +26,7 @@ ui64 TZeroLevelPortions::DoGetWeight() const {
return 0;
}
if (PredOptimization && TInstant::Now() - *PredOptimization < DurationToDrop) {
- if (PortionsInfo.PredictPackedBlobBytes(GetPackKff()) < ExpectedBlobsSize) {
+ if (GetPortionsInfo().PredictPackedBlobBytes(GetPackKff()) < ExpectedBlobsSize) {
return 0;
}
}
@@ -57,8 +57,8 @@ ui64 TZeroLevelPortions::DoGetWeight() const {
}
*/
- const ui64 mb = (affectedRawBytes + PortionsInfo.GetRawBytes()) / 1000000 + 1;
- return 1000.0 * PortionsInfo.GetCount() * PortionsInfo.GetCount() / mb;
+ const ui64 mb = (affectedRawBytes + GetPortionsInfo().GetRawBytes()) / 1000000 + 1;
+ return 1000.0 * GetPortionsInfo().GetCount() * GetPortionsInfo().GetCount() / mb;
}
TInstant TZeroLevelPortions::DoGetWeightExpirationInstant() const {
@@ -69,15 +69,13 @@ TInstant TZeroLevelPortions::DoGetWeightExpirationInstant() const {
}
TZeroLevelPortions::TZeroLevelPortions(const ui32 levelIdx, const std::shared_ptr<IPortionsLevel>& nextLevel,
- const TLevelCounters& levelCounters, const TDuration durationToDrop, const ui64 expectedBlobsSize, const ui64 portionsCountAvailable,
- const std::optional<ui64> portionsCountLimit, const std::optional<ui64> portionsSizeLimit)
- : TBase(levelIdx, nextLevel)
- , LevelCounters(levelCounters)
+ const TLevelCounters& levelCounters, const std::shared_ptr<IOverloadChecker>& overloadChecker, const TDuration durationToDrop,
+ const ui64 expectedBlobsSize, const ui64 portionsCountAvailable, const std::vector<std::shared_ptr<IPortionsSelector>>& selectors,
+ const TString& defaultSelectorName)
+ : TBase(levelIdx, nextLevel, overloadChecker, levelCounters, selectors, defaultSelectorName)
, DurationToDrop(durationToDrop)
, ExpectedBlobsSize(expectedBlobsSize)
- , PortionsCountAvailable(portionsCountAvailable)
- , PortionsCountLimit(portionsCountLimit)
- , PortionsSizeLimit(portionsSizeLimit) {
+ , PortionsCountAvailable(portionsCountAvailable) {
if (DurationToDrop != TDuration::Max() && PredOptimization) {
*PredOptimization -= TDuration::Seconds(RandomNumber<ui32>(DurationToDrop.Seconds()));
}
diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/zero_level.h b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/level/zero_level.h
index 76f626cfd2c..9b8a7dd6435 100644
--- a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/zero_level.h
+++ b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/level/zero_level.h
@@ -7,50 +7,12 @@ namespace NKikimr::NOlap::NStorageOptimizer::NLCBuckets {
class TZeroLevelPortions: public IPortionsLevel {
private:
using TBase = IPortionsLevel;
- const TLevelCounters LevelCounters;
const TDuration DurationToDrop;
const ui64 ExpectedBlobsSize;
const ui64 PortionsCountAvailable;
- const std::optional<ui64> PortionsCountLimit;
- const std::optional<ui64> PortionsSizeLimit;
- class TOrderedPortion {
- private:
- YDB_READONLY_DEF(TPortionInfo::TConstPtr, Portion);
- public:
- TOrderedPortion(const TPortionInfo::TConstPtr& portion)
- : Portion(portion) {
- }
-
- TOrderedPortion(const TPortionInfo::TPtr& portion)
- : Portion(portion) {
- }
-
- bool operator==(const TOrderedPortion& item) const {
- return item.Portion->GetPathId() == Portion->GetPathId() && item.Portion->GetPortionId() == Portion->GetPortionId();
- }
-
- bool operator<(const TOrderedPortion& item) const {
- auto cmp = Portion->IndexKeyStart().CompareNotNull(item.Portion->IndexKeyStart());
- if (cmp == std::partial_ordering::equivalent) {
- return Portion->GetPortionId() < item.Portion->GetPortionId();
- } else {
- return cmp == std::partial_ordering::less;
- }
- }
- };
std::set<TOrderedPortion> Portions;
- virtual bool IsOverloaded() const override {
- if (PortionsCountLimit && Portions.size() > *PortionsCountLimit) {
- return true;
- }
- if (PortionsSizeLimit && (ui64)PortionsInfo.GetBlobBytes() > (ui64)*PortionsSizeLimit) {
- return true;
- }
- return false;
- }
-
virtual NArrow::NMerger::TIntervalPositions DoGetBucketPositions(const std::shared_ptr<arrow::Schema>& /*pkSchema*/) const override {
return NArrow::NMerger::TIntervalPositions();
}
@@ -60,6 +22,14 @@ private:
return std::nullopt;
}
+ virtual bool IsAppropriatePortionToMove(const TPortionAccessorConstructor& info) const override {
+ return info.GetTotalBlobsSize() > ExpectedBlobsSize;
+ }
+
+ virtual bool IsAppropriatePortionToStore(const TPortionAccessorConstructor& /*info*/) const override {
+ return true;
+ }
+
virtual ui64 DoGetAffectedPortionBytes(const NArrow::TSimpleRow& /*from*/, const NArrow::TSimpleRow& /*to*/) const override {
return 0;
}
@@ -80,14 +50,9 @@ private:
if (!constructionFlag) {
AFL_VERIFY(Portions.emplace(i).second);
}
- PortionsInfo.AddPortion(i);
- LevelCounters.Portions->AddPortion(i);
- i->InitRuntimeFeature(TPortionInfo::ERuntimeFeature::Optimized, !NextLevel);
}
for (auto&& i : remove) {
AFL_VERIFY(Portions.erase(i));
- LevelCounters.Portions->RemovePortion(i);
- PortionsInfo.RemovePortion(i);
}
}
@@ -107,8 +72,8 @@ private:
public:
TZeroLevelPortions(const ui32 levelIdx, const std::shared_ptr<IPortionsLevel>& nextLevel, const TLevelCounters& levelCounters,
- const TDuration durationToDrop, const ui64 expectedBlobsSize, const ui64 portionsCountAvailable,
- const std::optional<ui64> portionsCountLimit, const std::optional<ui64> portionsSizeLimit);
+ const std::shared_ptr<IOverloadChecker>& overloadChecker, const TDuration durationToDrop, const ui64 expectedBlobsSize,
+ const ui64 portionsCountAvailable, const std::vector<std::shared_ptr<IPortionsSelector>>& selectors, const TString& defaultSelectorName);
};
} // namespace NKikimr::NOlap::NStorageOptimizer::NLCBuckets
diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/optimizer.cpp b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/optimizer.cpp
index c9552fc719c..7ea35f94253 100644
--- a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/optimizer.cpp
+++ b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/optimizer.cpp
@@ -1,7 +1,9 @@
-#include "accumulation_level.h"
-#include "common_level.h"
#include "optimizer.h"
-#include "zero_level.h"
+
+#include "level/common_level.h"
+#include "level/zero_level.h"
+#include "selector/snapshot.h"
+#include "selector/transparent.h"
#include <ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/constructor/constructor.h>
@@ -10,11 +12,22 @@
namespace NKikimr::NOlap::NStorageOptimizer::NLCBuckets {
TOptimizerPlanner::TOptimizerPlanner(const TInternalPathId pathId, const std::shared_ptr<IStoragesManager>& storagesManager,
- const std::shared_ptr<arrow::Schema>& primaryKeysSchema, const std::vector<TLevelConstructorContainer>& levelConstructors)
+ const std::shared_ptr<arrow::Schema>& primaryKeysSchema, const std::vector<TLevelConstructorContainer>& levelConstructors,
+ const std::vector<TSelectorConstructorContainer>& selectors)
: TBase(pathId)
, Counters(std::make_shared<TCounters>())
, StoragesManager(storagesManager)
, PrimaryKeysSchema(primaryKeysSchema) {
+ {
+ std::set<TString> selectorNames;
+ for (auto&& i : selectors) {
+ AFL_VERIFY(selectorNames.emplace(i->GetName()).second);
+ Selectors.emplace_back(i->BuildSelector());
+ }
+ if (Selectors.empty()) {
+ Selectors = { std::make_shared<TTransparentPortionsSelector>("default") };
+ }
+ }
std::shared_ptr<IPortionsLevel> nextLevel;
/*
const ui64 maxPortionBlobBytes = (ui64)1 << 20;
@@ -26,17 +39,16 @@ TOptimizerPlanner::TOptimizerPlanner(const TInternalPathId pathId, const std::sh
ui32 idx = levelConstructors.size();
for (auto it = levelConstructors.rbegin(); it != levelConstructors.rend(); ++it) {
--idx;
- Levels.emplace_back((*it)->BuildLevel(nextLevel, idx, PortionsInfo, Counters->GetLevelCounters(idx)));
+ Levels.emplace_back((*it)->BuildLevel(nextLevel, idx, PortionsInfo, Counters->GetLevelCounters(idx), Selectors));
nextLevel = Levels.back();
}
} else {
Levels.emplace_back(std::make_shared<TZeroLevelPortions>(
- 2, nullptr, Counters->GetLevelCounters(2), TDuration::Max(), 1 << 20, 10, std::nullopt, std::nullopt));
- Levels.emplace_back(
- std::make_shared<TZeroLevelPortions>(
- 1, Levels.back(), Counters->GetLevelCounters(1), TDuration::Max(), 1 << 20, 10, std::nullopt, std::nullopt));
+ 2, nullptr, Counters->GetLevelCounters(2), nullptr, TDuration::Max(), 1 << 20, 10, Selectors, "default"));
+ Levels.emplace_back(std::make_shared<TZeroLevelPortions>(
+ 1, Levels.back(), Counters->GetLevelCounters(1), nullptr, TDuration::Max(), 1 << 20, 10, Selectors, "default"));
Levels.emplace_back(std::make_shared<TZeroLevelPortions>(
- 0, Levels.back(), Counters->GetLevelCounters(0), TDuration::Seconds(180), 1 << 20, 10, std::nullopt, std::nullopt));
+ 0, Levels.back(), Counters->GetLevelCounters(0), nullptr, TDuration::Seconds(180), 1 << 20, 10, Selectors, "default"));
}
std::reverse(Levels.begin(), Levels.end());
RefreshWeights();
diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/optimizer.h b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/optimizer.h
index e34f678f896..53212a66045 100644
--- a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/optimizer.h
+++ b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/optimizer.h
@@ -1,11 +1,13 @@
#pragma once
-#include "abstract.h"
-#include "counters.h"
+#include "level/abstract.h"
+
#include <ydb/core/tx/columnshard/common/path_id.h>
+#include <ydb/core/tx/columnshard/engines/storage/optimizer/abstract/optimizer.h>
namespace NKikimr::NOlap::NStorageOptimizer::NLCBuckets {
class TLevelConstructorContainer;
+class TSelectorConstructorContainer;
class TOptimizerPlanner: public IOptimizerPlanner {
private:
@@ -13,10 +15,22 @@ private:
std::shared_ptr<TCounters> Counters;
std::shared_ptr<TSimplePortionsGroupInfo> PortionsInfo = std::make_shared<TSimplePortionsGroupInfo>();
+ std::vector<std::shared_ptr<IPortionsSelector>> Selectors;
std::vector<std::shared_ptr<IPortionsLevel>> Levels;
std::map<ui64, std::shared_ptr<IPortionsLevel>, std::greater<ui64>> LevelsByWeight;
const std::shared_ptr<IStoragesManager> StoragesManager;
const std::shared_ptr<arrow::Schema> PrimaryKeysSchema;
+
+ virtual ui32 GetAppropriateLevel(const ui32 baseLevel, const TPortionAccessorConstructor& info) const override {
+ ui32 result = baseLevel;
+ for (ui32 i = baseLevel; i + 1 < Levels.size(); ++i) {
+ if (Levels[i]->IsAppropriatePortionToMove(info) && Levels[i + 1]->IsAppropriatePortionToStore(info)) {
+ result = i + 1;
+ }
+ }
+ return result;
+ }
+
virtual bool DoIsOverloaded() const override {
for (auto&& i : Levels) {
if (i->IsOverloaded()) {
@@ -51,8 +65,7 @@ protected:
return false;
}
- virtual void DoModifyPortions(
- const THashMap<ui64, TPortionInfo::TPtr>& add, const THashMap<ui64, TPortionInfo::TPtr>& remove) override {
+ virtual void DoModifyPortions(const THashMap<ui64, TPortionInfo::TPtr>& add, const THashMap<ui64, TPortionInfo::TPtr>& remove) override {
std::vector<std::vector<TPortionInfo::TPtr>> removePortionsByLevel;
removePortionsByLevel.resize(Levels.size());
for (auto&& [_, i] : remove) {
@@ -91,7 +104,7 @@ protected:
if (i->GetTotalBlobBytes() > 512 * 1024 && i->GetMeta().GetProduced() != NPortion::EProduced::INSERTED) {
for (i32 levelIdx = Levels.size() - 1; levelIdx >= 0; --levelIdx) {
if (Levels[levelIdx]->CanTakePortion(i)) {
- Levels[levelIdx]->ModifyPortions({i}, {});
+ Levels[levelIdx]->ModifyPortions({ i }, {});
i->MutableMeta().ResetCompactionLevel(levelIdx);
break;
}
@@ -149,7 +162,8 @@ public:
}
TOptimizerPlanner(const TInternalPathId pathId, const std::shared_ptr<IStoragesManager>& storagesManager,
- const std::shared_ptr<arrow::Schema>& primaryKeysSchema, const std::vector<TLevelConstructorContainer>& levelConstructors);
+ const std::shared_ptr<arrow::Schema>& primaryKeysSchema, const std::vector<TLevelConstructorContainer>& levelConstructors,
+ const std::vector<TSelectorConstructorContainer>& selectors);
};
} // namespace NKikimr::NOlap::NStorageOptimizer::NLCBuckets
diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/accumulation_level.cpp b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/selector/abstract.cpp
index fcd7fcbb9bb..de23092161e 100644
--- a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/accumulation_level.cpp
+++ b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/selector/abstract.cpp
@@ -1,4 +1,4 @@
-#include "accumulation_level.h"
+#include "abstract.h"
namespace NKikimr::NOlap::NStorageOptimizer::NLCBuckets {
diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/selector/abstract.h b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/selector/abstract.h
new file mode 100644
index 00000000000..2b240844b17
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/selector/abstract.h
@@ -0,0 +1,25 @@
+#pragma once
+#include <ydb/core/tx/columnshard/engines/portions/portion_info.h>
+
+#include <ydb/services/bg_tasks/abstract/interface.h>
+
+namespace NKikimr::NOlap::NStorageOptimizer::NLCBuckets {
+
+class IPortionsSelector {
+private:
+ YDB_READONLY_DEF(TString, Name);
+ virtual bool DoIsAppropriate(const TPortionInfo::TPtr& portionInfo) const = 0;
+
+public:
+ virtual ~IPortionsSelector() = default;
+
+ IPortionsSelector(const TString& name)
+ : Name(name) {
+ }
+
+ bool IsAppropriate(const TPortionInfo::TPtr& portionInfo) const {
+ return DoIsAppropriate(portionInfo);
+ }
+};
+
+} // namespace NKikimr::NOlap::NStorageOptimizer::NLCBuckets
diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/selector/empty.cpp b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/selector/empty.cpp
new file mode 100644
index 00000000000..d5706c6ffed
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/selector/empty.cpp
@@ -0,0 +1,5 @@
+#include "empty.h"
+
+namespace NKikimr::NOlap::NStorageOptimizer::NLCBuckets {
+
+}
diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/selector/empty.h b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/selector/empty.h
new file mode 100644
index 00000000000..de4ff29a440
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/selector/empty.h
@@ -0,0 +1,17 @@
+#pragma once
+#include "abstract.h"
+
+namespace NKikimr::NOlap::NStorageOptimizer::NLCBuckets {
+
+class TEmptyPortionsSelector: public IPortionsSelector {
+private:
+ using TBase = IPortionsSelector;
+ virtual bool DoIsAppropriate(const TPortionInfo::TPtr& /*portionInfo*/) const override {
+ return false;
+ }
+
+public:
+ using TBase::TBase;
+};
+
+} // namespace NKikimr::NOlap::NStorageOptimizer::NLCBuckets
diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/selector/snapshot.cpp b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/selector/snapshot.cpp
new file mode 100644
index 00000000000..666d5deb896
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/selector/snapshot.cpp
@@ -0,0 +1,5 @@
+#include "snapshot.h"
+
+namespace NKikimr::NOlap::NStorageOptimizer::NLCBuckets {
+
+}
diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/selector/snapshot.h b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/selector/snapshot.h
new file mode 100644
index 00000000000..5bbab88fbc6
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/selector/snapshot.h
@@ -0,0 +1,93 @@
+#pragma once
+#include "abstract.h"
+
+namespace NKikimr::NOlap::NStorageOptimizer::NLCBuckets {
+
+class TDataSnapshotInterval {
+private:
+ YDB_READONLY_DEF(std::optional<TInstant>, StartInstant);
+ YDB_READONLY_DEF(std::optional<TInstant>, FinishInstant);
+
+public:
+ bool IsEmpty() const {
+ return !StartInstant && !FinishInstant;
+ }
+
+ bool operator==(const TDataSnapshotInterval& item) const = default;
+
+ TDataSnapshotInterval() = default;
+ TDataSnapshotInterval(const std::optional<TInstant>& start, const std::optional<TInstant>& finish)
+ : StartInstant(start)
+ , FinishInstant(finish) {
+ AFL_VERIFY(!StartInstant || !FinishInstant || *StartInstant <= *FinishInstant);
+ }
+
+ TConclusionStatus DeserializeFromJson(const NJson::TJsonValue& jsonValue) {
+ if (!jsonValue.IsMap()) {
+ return TConclusionStatus::Fail("json have to be a map");
+ }
+ if (jsonValue.Has("start_seconds_utc")) {
+ if (!jsonValue["start_seconds_utc"].IsUInteger()) {
+ return TConclusionStatus::Fail("json start_seconds_utc value have to be unsigned int");
+ }
+ StartInstant = TInstant::Seconds(jsonValue["start_seconds_utc"].GetUInteger());
+ }
+ if (jsonValue.Has("finish_seconds_utc")) {
+ if (!jsonValue["finish_seconds_utc"].IsUInteger()) {
+ return TConclusionStatus::Fail("json finish_seconds_utc value have to be unsigned int");
+ }
+ FinishInstant = TInstant::Seconds(jsonValue["finish_seconds_utc"].GetUInteger());
+ }
+ return TConclusionStatus::Success();
+ }
+
+ template <class TProto>
+ TConclusionStatus DeserializeFromProto(const TProto& proto) {
+ if (proto.HasStartSecondsUTC()) {
+ StartInstant = TInstant::Seconds(proto.GetStartSecondsUTC());
+ }
+ if (proto.HasFinishSecondsUTC()) {
+ FinishInstant = TInstant::Seconds(proto.GetFinishSecondsUTC());
+ }
+ return TConclusionStatus::Success();
+ }
+
+ NKikimrSchemeOp::TCompactionSelectorConstructorContainer::TDataSnapshotInterval SerializeToProto() const {
+ NKikimrSchemeOp::TCompactionSelectorConstructorContainer::TDataSnapshotInterval result;
+ if (StartInstant) {
+ result.SetStartSecondsUTC(StartInstant->Seconds());
+ }
+ if (FinishInstant) {
+ result.SetFinishSecondsUTC(FinishInstant->Seconds());
+ }
+ return result;
+ }
+
+ bool CheckPortion(const TPortionInfo::TPtr& p) const {
+ if (StartInstant && p->RecordSnapshotMax().GetPlanInstant() < *StartInstant) {
+ return false;
+ }
+ if (FinishInstant && *FinishInstant <= p->RecordSnapshotMin().GetPlanInstant()) {
+ return false;
+ }
+ return true;
+ }
+};
+
+class TSnapshotPortionsSelector: public IPortionsSelector {
+private:
+ using TBase = IPortionsSelector;
+ const TDataSnapshotInterval DataSnapshotInterval;
+
+ virtual bool DoIsAppropriate(const TPortionInfo::TPtr& portionInfo) const override {
+ return DataSnapshotInterval.CheckPortion(portionInfo);
+ }
+
+public:
+ TSnapshotPortionsSelector(const TDataSnapshotInterval& dataSnapshotInterval, const TString& name)
+ : TBase(name)
+ , DataSnapshotInterval(dataSnapshotInterval) {
+ }
+};
+
+} // namespace NKikimr::NOlap::NStorageOptimizer::NLCBuckets
diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/selector/transparent.cpp b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/selector/transparent.cpp
new file mode 100644
index 00000000000..b58e84aad57
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/selector/transparent.cpp
@@ -0,0 +1,5 @@
+#include "transparent.h"
+
+namespace NKikimr::NOlap::NStorageOptimizer::NLCBuckets {
+
+}
diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/selector/transparent.h b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/selector/transparent.h
new file mode 100644
index 00000000000..f5aea9bb2f8
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/selector/transparent.h
@@ -0,0 +1,17 @@
+#pragma once
+#include "abstract.h"
+
+namespace NKikimr::NOlap::NStorageOptimizer::NLCBuckets {
+
+class TTransparentPortionsSelector: public IPortionsSelector {
+private:
+ using TBase = IPortionsSelector;
+ virtual bool DoIsAppropriate(const TPortionInfo::TPtr& /*portionInfo*/) const override {
+ return true;
+ }
+
+public:
+ using TBase::TBase;
+};
+
+} // namespace NKikimr::NOlap::NStorageOptimizer::NLCBuckets
diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/selector/ya.make b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/selector/ya.make
new file mode 100644
index 00000000000..e12bd5f5db4
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/selector/ya.make
@@ -0,0 +1,17 @@
+LIBRARY()
+
+SRCS(
+ abstract.cpp
+ snapshot.cpp
+ empty.cpp
+ transparent.cpp
+)
+
+PEERDIR(
+ contrib/libs/apache/arrow
+ ydb/core/protos
+ ydb/core/formats/arrow
+ ydb/core/tx/columnshard/engines/changes/abstract
+)
+
+END()
diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/ya.make b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/ya.make
index 7eba1467e8b..3efeb2941fe 100644
--- a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/ya.make
+++ b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/ya.make
@@ -1,11 +1,7 @@
LIBRARY()
SRCS(
- abstract.cpp
- zero_level.cpp
- common_level.cpp
GLOBAL optimizer.cpp
- counters.cpp
)
PEERDIR(
@@ -13,6 +9,8 @@ PEERDIR(
ydb/core/protos
ydb/core/formats/arrow
ydb/core/tx/columnshard/engines/changes/abstract
+ ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/level
+ ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/selector
)
END()