aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov <ivanmorozov@yandex-team.com>2023-08-21 16:48:38 +0300
committerivanmorozov <ivanmorozov@yandex-team.com>2023-08-21 17:11:19 +0300
commiteac8ca1f552726198b4d7a21fcdecf8954339262 (patch)
tree2b8e9be86d85ffc1de5e471074fbe746823b6d49
parented563c45a30cc549e23e0b672f0e890fa6849361 (diff)
downloadydb-eac8ca1f552726198b4d7a21fcdecf8954339262.tar.gz
KIKIMR-19092: simple compaction and optimizer for intervals correction
-rw-r--r--ydb/core/tx/columnshard/columnshard_impl.h2
-rw-r--r--ydb/core/tx/columnshard/engines/changes/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/core/tx/columnshard/engines/changes/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/core/tx/columnshard/engines/changes/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/core/tx/columnshard/engines/changes/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/core/tx/columnshard/engines/changes/general_compaction.cpp51
-rw-r--r--ydb/core/tx/columnshard/engines/changes/general_compaction.h28
-rw-r--r--ydb/core/tx/columnshard/engines/changes/ya.make1
-rw-r--r--ydb/core/tx/columnshard/engines/storage/CMakeLists.darwin-x86_64.txt2
-rw-r--r--ydb/core/tx/columnshard/engines/storage/CMakeLists.linux-aarch64.txt2
-rw-r--r--ydb/core/tx/columnshard/engines/storage/CMakeLists.linux-x86_64.txt2
-rw-r--r--ydb/core/tx/columnshard/engines/storage/CMakeLists.windows-x86_64.txt2
-rw-r--r--ydb/core/tx/columnshard/engines/storage/optimizer/CMakeLists.darwin-x86_64.txt21
-rw-r--r--ydb/core/tx/columnshard/engines/storage/optimizer/CMakeLists.linux-aarch64.txt22
-rw-r--r--ydb/core/tx/columnshard/engines/storage/optimizer/CMakeLists.linux-x86_64.txt22
-rw-r--r--ydb/core/tx/columnshard/engines/storage/optimizer/CMakeLists.txt17
-rw-r--r--ydb/core/tx/columnshard/engines/storage/optimizer/CMakeLists.windows-x86_64.txt21
-rw-r--r--ydb/core/tx/columnshard/engines/storage/optimizer/intervals_optimizer.cpp98
-rw-r--r--ydb/core/tx/columnshard/engines/storage/optimizer/intervals_optimizer.h359
-rw-r--r--ydb/core/tx/columnshard/engines/storage/optimizer/optimizer.cpp5
-rw-r--r--ydb/core/tx/columnshard/engines/storage/optimizer/optimizer.h55
-rw-r--r--ydb/core/tx/columnshard/engines/storage/optimizer/ut/ut_optimizer.cpp90
-rw-r--r--ydb/core/tx/columnshard/engines/storage/optimizer/ut/ya.make28
-rw-r--r--ydb/core/tx/columnshard/engines/storage/optimizer/ya.make18
-rw-r--r--ydb/core/tx/columnshard/engines/storage/ya.make1
25 files changed, 851 insertions, 0 deletions
diff --git a/ydb/core/tx/columnshard/columnshard_impl.h b/ydb/core/tx/columnshard/columnshard_impl.h
index d1d9e8e31b..60c962bb95 100644
--- a/ydb/core/tx/columnshard/columnshard_impl.h
+++ b/ydb/core/tx/columnshard/columnshard_impl.h
@@ -31,6 +31,7 @@ class TCompactColumnEngineChanges;
class TInGranuleCompactColumnEngineChanges;
class TSplitCompactColumnEngineChanges;
class TInsertColumnEngineChanges;
+class TGeneralCompactColumnEngineChanges;
}
namespace NKikimr::NColumnShard {
@@ -114,6 +115,7 @@ class TColumnShard
friend class NOlap::TSplitCompactColumnEngineChanges;
friend class NOlap::TInsertColumnEngineChanges;
friend class NOlap::TColumnEngineChanges;
+ friend class NOlap::TGeneralCompactColumnEngineChanges;
friend class TTxController;
diff --git a/ydb/core/tx/columnshard/engines/changes/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/columnshard/engines/changes/CMakeLists.darwin-x86_64.txt
index 3aa52c3cbd..8aa7456a27 100644
--- a/ydb/core/tx/columnshard/engines/changes/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/tx/columnshard/engines/changes/CMakeLists.darwin-x86_64.txt
@@ -30,4 +30,5 @@ target_sources(columnshard-engines-changes PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/changes/cleanup.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/changes/mark_granules.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/changes/with_appended.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/changes/general_compaction.cpp
)
diff --git a/ydb/core/tx/columnshard/engines/changes/CMakeLists.linux-aarch64.txt b/ydb/core/tx/columnshard/engines/changes/CMakeLists.linux-aarch64.txt
index f068d3122d..e25f5d70d8 100644
--- a/ydb/core/tx/columnshard/engines/changes/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/tx/columnshard/engines/changes/CMakeLists.linux-aarch64.txt
@@ -31,4 +31,5 @@ target_sources(columnshard-engines-changes PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/changes/cleanup.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/changes/mark_granules.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/changes/with_appended.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/changes/general_compaction.cpp
)
diff --git a/ydb/core/tx/columnshard/engines/changes/CMakeLists.linux-x86_64.txt b/ydb/core/tx/columnshard/engines/changes/CMakeLists.linux-x86_64.txt
index f068d3122d..e25f5d70d8 100644
--- a/ydb/core/tx/columnshard/engines/changes/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/tx/columnshard/engines/changes/CMakeLists.linux-x86_64.txt
@@ -31,4 +31,5 @@ target_sources(columnshard-engines-changes PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/changes/cleanup.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/changes/mark_granules.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/changes/with_appended.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/changes/general_compaction.cpp
)
diff --git a/ydb/core/tx/columnshard/engines/changes/CMakeLists.windows-x86_64.txt b/ydb/core/tx/columnshard/engines/changes/CMakeLists.windows-x86_64.txt
index 3aa52c3cbd..8aa7456a27 100644
--- a/ydb/core/tx/columnshard/engines/changes/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/tx/columnshard/engines/changes/CMakeLists.windows-x86_64.txt
@@ -30,4 +30,5 @@ target_sources(columnshard-engines-changes PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/changes/cleanup.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/changes/mark_granules.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/changes/with_appended.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/changes/general_compaction.cpp
)
diff --git a/ydb/core/tx/columnshard/engines/changes/general_compaction.cpp b/ydb/core/tx/columnshard/engines/changes/general_compaction.cpp
new file mode 100644
index 0000000000..7e07b925c7
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/changes/general_compaction.cpp
@@ -0,0 +1,51 @@
+#include "general_compaction.h"
+#include <ydb/core/tx/columnshard/columnshard_impl.h>
+#include <ydb/core/tx/columnshard/engines/storage/granule.h>
+
+namespace NKikimr::NOlap {
+
+TConclusionStatus TGeneralCompactColumnEngineChanges::DoConstructBlobs(TConstructionContext& context) noexcept {
+ const ui64 pathId = GranuleMeta->GetPathId();
+ std::vector<TPortionInfoWithBlobs> portions = TPortionInfoWithBlobs::RestorePortions(SwitchedPortions, Blobs);
+ std::optional<TSnapshot> maxSnapshot;
+ for (auto&& i : SwitchedPortions) {
+ if (!maxSnapshot || *maxSnapshot < i.GetMinSnapshot()) {
+ maxSnapshot = i.GetMinSnapshot();
+ }
+ }
+ Y_VERIFY(maxSnapshot);
+
+ std::vector<std::shared_ptr<arrow::RecordBatch>> batches;
+ auto resultSchema = context.SchemaVersions.GetLastSchema();
+ for (auto&& i : portions) {
+ auto dataSchema = context.SchemaVersions.GetSchema(i.GetPortionInfo().GetMinSnapshot());
+ batches.emplace_back(i.GetBatch(*dataSchema, *resultSchema));
+ Y_VERIFY(NArrow::IsSorted(batches.back(), resultSchema->GetIndexInfo().GetReplaceKey()));
+ }
+
+ auto merged = NArrow::MergeSortedBatches(batches, resultSchema->GetIndexInfo().SortReplaceDescription(), Max<size_t>());
+ Y_VERIFY(merged.size() == 1);
+ auto batchResult = merged.front();
+ AppendedPortions = MakeAppendedPortions(pathId, batchResult, GranuleMeta->GetGranuleId(), *maxSnapshot, GranuleMeta.get(), context);
+
+ return TConclusionStatus::Success();
+}
+
+void TGeneralCompactColumnEngineChanges::DoWriteIndexComplete(NColumnShard::TColumnShard& self, TWriteIndexCompleteContext& context) {
+ TBase::DoWriteIndexComplete(self, context);
+ self.IncCounter(context.FinishedSuccessfully ? NColumnShard::COUNTER_SPLIT_COMPACTION_SUCCESS : NColumnShard::COUNTER_SPLIT_COMPACTION_FAIL);
+ self.IncCounter(NColumnShard::COUNTER_SPLIT_COMPACTION_BLOBS_WRITTEN, context.BlobsWritten);
+ self.IncCounter(NColumnShard::COUNTER_SPLIT_COMPACTION_BYTES_WRITTEN, context.BytesWritten);
+}
+
+void TGeneralCompactColumnEngineChanges::DoStart(NColumnShard::TColumnShard& self) {
+ TBase::DoStart(self);
+ auto& g = *GranuleMeta;
+ self.CSCounters.OnSplitCompactionInfo(g.GetAdditiveSummary().GetOther().GetPortionsSize(), g.GetAdditiveSummary().GetOther().GetPortionsCount());
+}
+
+NColumnShard::ECumulativeCounters TGeneralCompactColumnEngineChanges::GetCounterIndex(const bool isSuccess) const {
+ return isSuccess ? NColumnShard::COUNTER_COMPACTION_SUCCESS : NColumnShard::COUNTER_COMPACTION_FAIL;
+}
+
+}
diff --git a/ydb/core/tx/columnshard/engines/changes/general_compaction.h b/ydb/core/tx/columnshard/engines/changes/general_compaction.h
new file mode 100644
index 0000000000..40a784c556
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/changes/general_compaction.h
@@ -0,0 +1,28 @@
+#pragma once
+#include "compaction.h"
+
+namespace NKikimr::NOlap {
+
+class TGeneralCompactColumnEngineChanges: public TCompactColumnEngineChanges {
+private:
+ using TBase = TCompactColumnEngineChanges;
+ virtual void DoWriteIndexComplete(NColumnShard::TColumnShard& self, TWriteIndexCompleteContext& context) override;
+protected:
+ virtual TConclusionStatus DoConstructBlobs(TConstructionContext& context) noexcept override;
+ virtual TPortionMeta::EProduced GetResultProducedClass() const override {
+ return TPortionMeta::EProduced::SPLIT_COMPACTED;
+ }
+ virtual void DoStart(NColumnShard::TColumnShard& self) override;
+ virtual NColumnShard::ECumulativeCounters GetCounterIndex(const bool isSuccess) const override;
+public:
+ virtual bool IsSplit() const override {
+ return false;
+ }
+ using TBase::TBase;
+
+ virtual TString TypeString() const override {
+ return "GENERAL_COMPACTION";
+ }
+};
+
+}
diff --git a/ydb/core/tx/columnshard/engines/changes/ya.make b/ydb/core/tx/columnshard/engines/changes/ya.make
index 63709818ee..3b5f899410 100644
--- a/ydb/core/tx/columnshard/engines/changes/ya.make
+++ b/ydb/core/tx/columnshard/engines/changes/ya.make
@@ -9,6 +9,7 @@ SRCS(
cleanup.cpp
mark_granules.cpp
with_appended.cpp
+ general_compaction.cpp
)
PEERDIR(
diff --git a/ydb/core/tx/columnshard/engines/storage/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/columnshard/engines/storage/CMakeLists.darwin-x86_64.txt
index c3377f3caf..293a55fe65 100644
--- a/ydb/core/tx/columnshard/engines/storage/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/tx/columnshard/engines/storage/CMakeLists.darwin-x86_64.txt
@@ -6,6 +6,7 @@
# original buildsystem will not be accepted.
+add_subdirectory(optimizer)
get_built_tool_path(
TOOL_enum_parser_bin
TOOL_enum_parser_dependency
@@ -19,6 +20,7 @@ target_link_libraries(columnshard-engines-storage PUBLIC
yutil
libs-apache-arrow
ydb-core-protos
+ engines-storage-optimizer
core-formats-arrow
tools-enum_parser-enum_serialization_runtime
)
diff --git a/ydb/core/tx/columnshard/engines/storage/CMakeLists.linux-aarch64.txt b/ydb/core/tx/columnshard/engines/storage/CMakeLists.linux-aarch64.txt
index b3fe47a3e9..93d122b399 100644
--- a/ydb/core/tx/columnshard/engines/storage/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/tx/columnshard/engines/storage/CMakeLists.linux-aarch64.txt
@@ -6,6 +6,7 @@
# original buildsystem will not be accepted.
+add_subdirectory(optimizer)
get_built_tool_path(
TOOL_enum_parser_bin
TOOL_enum_parser_dependency
@@ -20,6 +21,7 @@ target_link_libraries(columnshard-engines-storage PUBLIC
yutil
libs-apache-arrow
ydb-core-protos
+ engines-storage-optimizer
core-formats-arrow
tools-enum_parser-enum_serialization_runtime
)
diff --git a/ydb/core/tx/columnshard/engines/storage/CMakeLists.linux-x86_64.txt b/ydb/core/tx/columnshard/engines/storage/CMakeLists.linux-x86_64.txt
index b3fe47a3e9..93d122b399 100644
--- a/ydb/core/tx/columnshard/engines/storage/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/tx/columnshard/engines/storage/CMakeLists.linux-x86_64.txt
@@ -6,6 +6,7 @@
# original buildsystem will not be accepted.
+add_subdirectory(optimizer)
get_built_tool_path(
TOOL_enum_parser_bin
TOOL_enum_parser_dependency
@@ -20,6 +21,7 @@ target_link_libraries(columnshard-engines-storage PUBLIC
yutil
libs-apache-arrow
ydb-core-protos
+ engines-storage-optimizer
core-formats-arrow
tools-enum_parser-enum_serialization_runtime
)
diff --git a/ydb/core/tx/columnshard/engines/storage/CMakeLists.windows-x86_64.txt b/ydb/core/tx/columnshard/engines/storage/CMakeLists.windows-x86_64.txt
index c3377f3caf..293a55fe65 100644
--- a/ydb/core/tx/columnshard/engines/storage/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/tx/columnshard/engines/storage/CMakeLists.windows-x86_64.txt
@@ -6,6 +6,7 @@
# original buildsystem will not be accepted.
+add_subdirectory(optimizer)
get_built_tool_path(
TOOL_enum_parser_bin
TOOL_enum_parser_dependency
@@ -19,6 +20,7 @@ target_link_libraries(columnshard-engines-storage PUBLIC
yutil
libs-apache-arrow
ydb-core-protos
+ engines-storage-optimizer
core-formats-arrow
tools-enum_parser-enum_serialization_runtime
)
diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/columnshard/engines/storage/optimizer/CMakeLists.darwin-x86_64.txt
new file mode 100644
index 0000000000..1d702939cc
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/storage/optimizer/CMakeLists.darwin-x86_64.txt
@@ -0,0 +1,21 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(engines-storage-optimizer)
+target_link_libraries(engines-storage-optimizer PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ libs-apache-arrow
+ ydb-core-protos
+ core-formats-arrow
+)
+target_sources(engines-storage-optimizer PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/storage/optimizer/optimizer.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/storage/optimizer/intervals_optimizer.cpp
+)
diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/CMakeLists.linux-aarch64.txt b/ydb/core/tx/columnshard/engines/storage/optimizer/CMakeLists.linux-aarch64.txt
new file mode 100644
index 0000000000..8953a79b60
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/storage/optimizer/CMakeLists.linux-aarch64.txt
@@ -0,0 +1,22 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(engines-storage-optimizer)
+target_link_libraries(engines-storage-optimizer PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ libs-apache-arrow
+ ydb-core-protos
+ core-formats-arrow
+)
+target_sources(engines-storage-optimizer PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/storage/optimizer/optimizer.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/storage/optimizer/intervals_optimizer.cpp
+)
diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/CMakeLists.linux-x86_64.txt b/ydb/core/tx/columnshard/engines/storage/optimizer/CMakeLists.linux-x86_64.txt
new file mode 100644
index 0000000000..8953a79b60
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/storage/optimizer/CMakeLists.linux-x86_64.txt
@@ -0,0 +1,22 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(engines-storage-optimizer)
+target_link_libraries(engines-storage-optimizer PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ libs-apache-arrow
+ ydb-core-protos
+ core-formats-arrow
+)
+target_sources(engines-storage-optimizer PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/storage/optimizer/optimizer.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/storage/optimizer/intervals_optimizer.cpp
+)
diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/CMakeLists.txt b/ydb/core/tx/columnshard/engines/storage/optimizer/CMakeLists.txt
new file mode 100644
index 0000000000..f8b31df0c1
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/storage/optimizer/CMakeLists.txt
@@ -0,0 +1,17 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-aarch64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64")
+ include(CMakeLists.darwin-x86_64.txt)
+elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA)
+ include(CMakeLists.windows-x86_64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-x86_64.txt)
+endif()
diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/CMakeLists.windows-x86_64.txt b/ydb/core/tx/columnshard/engines/storage/optimizer/CMakeLists.windows-x86_64.txt
new file mode 100644
index 0000000000..1d702939cc
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/storage/optimizer/CMakeLists.windows-x86_64.txt
@@ -0,0 +1,21 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(engines-storage-optimizer)
+target_link_libraries(engines-storage-optimizer PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ libs-apache-arrow
+ ydb-core-protos
+ core-formats-arrow
+)
+target_sources(engines-storage-optimizer PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/storage/optimizer/optimizer.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/storage/optimizer/intervals_optimizer.cpp
+)
diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/intervals_optimizer.cpp b/ydb/core/tx/columnshard/engines/storage/optimizer/intervals_optimizer.cpp
new file mode 100644
index 0000000000..01b233c6ee
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/storage/optimizer/intervals_optimizer.cpp
@@ -0,0 +1,98 @@
+#include "intervals_optimizer.h"
+#include <ydb/core/tx/columnshard/engines/changes/general_compaction.h>
+
+namespace NKikimr::NOlap::NStorageOptimizer {
+
+std::shared_ptr<TColumnEngineChanges> TIntervalsOptimizerPlanner::GetSmallPortionsMergeTask(const TCompactionLimits& limits, std::shared_ptr<TGranuleMeta> granule) const {
+ if (SumSmall > 8 * 1024 * 1024) {
+ ui64 currentSum = 0;
+ std::map<ui64, std::shared_ptr<TPortionInfo>> portions;
+ for (auto&& i : SmallBlobs) {
+ for (auto&& c : i.second) {
+ currentSum += c.second->BlobsBytes();
+ portions.emplace(c.first, c.second);
+ if (currentSum > 8 * 1024 * 1024) {
+ break;
+ }
+ }
+ if (currentSum > 8 * 1024 * 1024) {
+ break;
+ }
+ }
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "take_granule_with_small")("portions", portions.size())("current_sum", currentSum)("remained", SmallBlobs.size())("remained_size", SumSmall);
+ return std::make_shared<TGeneralCompactColumnEngineChanges>(limits, granule, portions);
+ }
+ return nullptr;
+}
+
+std::shared_ptr<TColumnEngineChanges> TIntervalsOptimizerPlanner::DoGetOptimizationTask(const TCompactionLimits& limits, std::shared_ptr<TGranuleMeta> granule, const THashSet<TPortionAddress>& busyPortions) const {
+ if (auto result = GetSmallPortionsMergeTask(limits, granule)) {
+ return result;
+ }
+ if (RangedSegments.empty()) {
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "no_ranged_segments");
+ return nullptr;
+ }
+ auto& topSegment = **RangedSegments.begin()->second.begin();
+ auto& topFeaturesTask = topSegment.GetFeatures();
+ if (!topFeaturesTask.IsCritical()) {
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "not_critical_task_top");
+ return nullptr;
+ }
+ TIntervalFeatures features;
+ if (topFeaturesTask.GetPortionsRawWeight() > 512 * 1024 * 1024) {
+ for (auto&& p : topFeaturesTask.GetSummaryPortions()) {
+ features.Add(p.second);
+ if (features.GetPortionsCount() > 1 && features.GetPortionsRawWeight() > 512 * 1024 * 1024) {
+ break;
+ }
+ }
+ } else {
+ auto itFwd = Positions.find(topSegment.GetPosition());
+ Y_VERIFY(itFwd != Positions.end());
+ features = itFwd->second.GetFeatures();
+ // this method made iterator next for itFwdPosition by reverse direction (left for topSegment.GetPosition())
+ auto itReverse = std::make_reverse_iterator(itFwd);
+ ++itFwd;
+ while (!features.IsEnoughWeight()) {
+ if (itFwd == Positions.end() && itReverse == Positions.rend()) {
+ break;
+ }
+ if (itFwd == Positions.end()) {
+ if (!features.Merge(itReverse->second.GetFeatures(), 512 * 1024 * 1024)) {
+ break;
+ }
+ ++itReverse;
+ } else if (itReverse == Positions.rend()) {
+ if (!features.Merge(itFwd->second.GetFeatures(), 512 * 1024 * 1024)) {
+ break;
+ }
+ ++itFwd;
+ } else if (itFwd->second.GetFeatures().GetUsefulKff() < itReverse->second.GetFeatures().GetUsefulKff()) {
+ if (!features.Merge(itReverse->second.GetFeatures(), 512 * 1024 * 1024)) {
+ break;
+ }
+ ++itReverse;
+ } else {
+ if (!features.Merge(itFwd->second.GetFeatures(), 512 * 1024 * 1024)) {
+ break;
+ }
+ ++itFwd;
+ }
+ }
+ }
+ Y_VERIFY(features.GetPortionsCount() > 1);
+
+ for (auto&& i : features.GetSummaryPortions()) {
+ if (busyPortions.contains(i.second->GetAddress())) {
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "take_granule_skip")("features", features.DebugJson().GetStringRobust())
+ ("count", features.GetSummaryPortions().size())("reason", "busy_portion")("portion_address", i.second->GetAddress().DebugString());
+ return nullptr;
+ }
+ }
+
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "take_granule")("features", features.DebugJson().GetStringRobust())("count", features.GetSummaryPortions().size());
+ return std::make_shared<TGeneralCompactColumnEngineChanges>(limits, granule, features.GetSummaryPortions());
+}
+
+} // namespace NKikimr::NOlap
diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/intervals_optimizer.h b/ydb/core/tx/columnshard/engines/storage/optimizer/intervals_optimizer.h
new file mode 100644
index 0000000000..f695fbf234
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/storage/optimizer/intervals_optimizer.h
@@ -0,0 +1,359 @@
+#pragma once
+#include "optimizer.h"
+#include <ydb/core/formats/arrow/replace_key.h>
+#include <ydb/library/accessor/accessor.h>
+
+namespace NKikimr::NOlap::NStorageOptimizer {
+
+class TSegmentPosition {
+private:
+ std::shared_ptr<TPortionInfo> Portion;
+ const NArrow::TReplaceKey& Position;
+ const bool IsStartFlag;
+ TSegmentPosition(const std::shared_ptr<TPortionInfo>& data, const bool start)
+ : Portion(data)
+ , Position(start ? Portion->IndexKeyStart() : Portion->IndexKeyEnd())
+ , IsStartFlag(start)
+ {
+
+ }
+public:
+
+ NJson::TJsonValue DebugJson() const {
+ NJson::TJsonValue result = NJson::JSON_MAP;
+ result.InsertValue("is_start", IsStartFlag);
+ return result;
+ }
+
+ TString DebugString() const {
+ return TStringBuilder() <<
+ (IsStartFlag ? "ADD" : "REMOVE") << ":" <<
+ Position.DebugString() << ";" <<
+ Portion->DebugString() << ";"
+ ;
+ }
+
+ const TPortionInfo& GetPortion() const {
+ return *Portion;
+ }
+
+ const NArrow::TReplaceKey& GetPosition() const {
+ return Position;
+ }
+
+ static TSegmentPosition Start(const std::shared_ptr<TPortionInfo>& data) {
+ return TSegmentPosition(data, true);
+ }
+
+ static TSegmentPosition Finish(const std::shared_ptr<TPortionInfo>& data) {
+ return TSegmentPosition(data, false);
+ }
+
+ bool operator<(const TSegmentPosition& item) const {
+ return Portion->GetPortion() < item.Portion->GetPortion();
+ }
+};
+
+class TIntervalFeatures {
+private:
+ YDB_READONLY(i32, PortionsCount, 0);
+ YDB_READONLY(i64, PortionsWeight, 0);
+ YDB_READONLY(i64, PortionsRawWeight, 0);
+ YDB_READONLY(i32, SmallPortionsWeight, 0);
+ YDB_READONLY(i64, SmallPortionsCount, 0);
+ std::map<ui64, std::shared_ptr<TPortionInfo>> SummaryPortions;
+public:
+
+ NJson::TJsonValue DebugJson() const {
+ NJson::TJsonValue result = NJson::JSON_MAP;
+ result.InsertValue("p_count", PortionsCount);
+ result.InsertValue("p_weight", PortionsWeight);
+ result.InsertValue("p_raw_weight", PortionsRawWeight);
+ result.InsertValue("sp_count", SmallPortionsCount);
+ result.InsertValue("sp_weight", SmallPortionsWeight);
+ auto& pIds = result.InsertValue("portion_ids", NJson::JSON_ARRAY);
+ for (auto&& i : SummaryPortions) {
+ pIds.AppendValue(i.first);
+ }
+ return result;
+ }
+
+ bool Merge(const TIntervalFeatures& features, const i64 sumWeightLimit) {
+ if (PortionsRawWeight + features.PortionsRawWeight > sumWeightLimit) {
+ return false;
+ }
+ for (auto&& i : features.SummaryPortions) {
+ if (SummaryPortions.contains(i.first)) {
+ continue;
+ }
+ Add(i.second);
+ }
+ return true;
+ }
+
+ i64 GetUsefulMetric() const {
+ if (PortionsCount == 1) {
+ return 0;
+ }
+ return PortionsCount;
+ }
+
+ double GetUsefulKff() const {
+ if (PortionsCount == 0) {
+ return Max<double>();
+ }
+ Y_VERIFY(PortionsWeight);
+ return 1.0 * GetUsefulMetric() / PortionsWeight;
+ }
+
+ void Add(const std::shared_ptr<TPortionInfo>& info) {
+ AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("event", "add_portion_in_summary")("portion_id", info->GetPortion())("count", SummaryPortions.size())("this", (ui64)this);
+ AFL_VERIFY(SummaryPortions.emplace(info->GetPortion(), info).second)("portion_id", info->GetPortion())("this", (ui64)this);
+ ++PortionsCount;
+ const i64 portionBytes = info->BlobsBytes();
+ PortionsWeight += portionBytes;
+ PortionsRawWeight += info->RawBytesSum();
+ if ((i64)portionBytes < TSplitSettings().GetMinBlobSize()) {
+ ++SmallPortionsCount;
+ SmallPortionsWeight += portionBytes;
+ }
+ }
+
+ void Remove(const std::shared_ptr<TPortionInfo>& info) {
+ AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("event", "remove_portion_from_summary")("portion_id", info->GetPortion())("count", SummaryPortions.size())("this", (ui64)this);
+ AFL_VERIFY(SummaryPortions.erase(info->GetPortion()))("portion_id", info->GetPortion())("this", (ui64)this);
+ Y_VERIFY(--PortionsCount >= 0);
+ const i64 portionBytes = info->BlobsBytes();
+ PortionsWeight -= portionBytes;
+ Y_VERIFY(PortionsWeight >= 0);
+ PortionsRawWeight -= info->RawBytesSum();
+ Y_VERIFY(PortionsRawWeight >= 0);
+ if ((i64)portionBytes < TSplitSettings().GetMinBlobSize()) {
+ Y_VERIFY(--SmallPortionsCount >= 0);
+ SmallPortionsWeight -= portionBytes;
+ Y_VERIFY(SmallPortionsWeight >= 0);
+ }
+ }
+
+ bool operator!() const {
+ return !PortionsCount;
+ }
+
+ bool operator<(const TIntervalFeatures& item) const {
+ return GetUsefulMetric() > item.GetUsefulMetric();
+ }
+ const std::map<ui64, std::shared_ptr<TPortionInfo>>& GetSummaryPortions() const {
+ return SummaryPortions;
+ }
+ bool IsEnoughWeight() const {
+ return GetPortionsWeight() > TSplitSettings().GetMinBlobSize();
+ }
+
+ bool IsCritical() const {
+ return PortionsCount > 1 || SmallPortionsCount;
+ }
+
+};
+
+class TIntervalsOptimizerPlanner: public IOptimizerPlanner {
+private:
+ using TBase = IOptimizerPlanner;
+
+ class TPortionIntervalPoint {
+ private:
+ YDB_READONLY(ui64, PortionId, 0);
+ YDB_READONLY(bool, IsStart, false);
+ public:
+ TPortionIntervalPoint(const ui64 portionId, const bool isStart)
+ : PortionId(portionId)
+ , IsStart(isStart)
+ {
+
+ }
+
+ bool operator<(const TPortionIntervalPoint& item) const {
+ return std::tie(PortionId, IsStart) < std::tie(item.PortionId, item.IsStart);
+ }
+ };
+
+ class TBorderPositions {
+ private:
+ const NArrow::TReplaceKey Position;
+ std::map<TPortionIntervalPoint, TSegmentPosition> Positions;
+ YDB_READONLY_DEF(TIntervalFeatures, Features);
+ public:
+ TBorderPositions(const NArrow::TReplaceKey& position)
+ : Position(position)
+ {
+
+ }
+
+ NJson::TJsonValue DebugJson() const {
+ NJson::TJsonValue result = NJson::JSON_MAP;
+ result.InsertValue("p", Position.DebugString());
+ auto& segments = result.InsertValue("segments", NJson::JSON_ARRAY);
+ for (auto&& i : Positions) {
+ segments.AppendValue(i.second.DebugJson());
+ }
+ result.InsertValue("features", Features.DebugJson());
+ return result;
+ }
+
+ void CopyFrom(const TBorderPositions& source) {
+ Features = source.Features;
+ }
+
+ const NArrow::TReplaceKey& GetPosition() const {
+ return Position;
+ }
+
+ void AddStart(const std::shared_ptr<TPortionInfo>& info) {
+ Y_VERIFY(Positions.emplace(TPortionIntervalPoint(info->GetPortion(), true), TSegmentPosition::Start(info)).second);
+ }
+ void AddFinish(const std::shared_ptr<TPortionInfo>& info) {
+ Y_VERIFY(Positions.emplace(TPortionIntervalPoint(info->GetPortion(), false), TSegmentPosition::Finish(info)).second);
+ }
+ bool RemoveStart(const std::shared_ptr<TPortionInfo>& info) {
+ Y_VERIFY(Positions.erase(TPortionIntervalPoint(info->GetPortion(), true)));
+ return Positions.empty();
+ }
+ bool RemoveFinish(const std::shared_ptr<TPortionInfo>& info) {
+ Y_VERIFY(Positions.erase(TPortionIntervalPoint(info->GetPortion(), false)));
+ return Positions.empty();
+ }
+ void AddSummary(const std::shared_ptr<TPortionInfo>& info) {
+ Features.Add(info);
+ }
+ void RemoveSummary(const std::shared_ptr<TPortionInfo>& info) {
+ Features.Remove(info);
+ }
+ };
+ std::map<TIntervalFeatures, std::set<const TBorderPositions*>> RangedSegments;
+ using TPositions = std::map<NArrow::TReplaceKey, TBorderPositions>;
+ TPositions Positions;
+ i64 SumSmall = 0;
+ std::map<NArrow::TReplaceKey, std::map<ui64, std::shared_ptr<TPortionInfo>>> SmallBlobs;
+
+ void RemoveRanged(const TBorderPositions& data) {
+ if (!!data.GetFeatures()) {
+ auto itFeatures = RangedSegments.find(data.GetFeatures());
+ Y_VERIFY(itFeatures->second.erase(&data));
+ if (itFeatures->second.empty()) {
+ RangedSegments.erase(itFeatures);
+ }
+ }
+ }
+
+ void AddRanged(const TBorderPositions& data) {
+ if (!!data.GetFeatures()) {
+ Y_VERIFY(RangedSegments[data.GetFeatures()].emplace(&data).second);
+ }
+ }
+
+ bool RemoveSmallPortion(const std::shared_ptr<TPortionInfo>& info) {
+ if (info->BlobsBytes() < 1024 * 1024) {
+ auto it = SmallBlobs.find(info->IndexKeyStart());
+ Y_VERIFY(it->second.erase(info->GetPortion()));
+ if (it->second.empty()) {
+ SmallBlobs.erase(it);
+ }
+ SumSmall -= info->BlobsBytes();
+ Y_VERIFY(SumSmall >= 0);
+ return true;
+ }
+ return false;
+ }
+
+ bool AddSmallPortion(const std::shared_ptr<TPortionInfo>& info) {
+ if (info->BlobsBytes() < 1024 * 1024) {
+ Y_VERIFY(SmallBlobs[info->IndexKeyStart()].emplace(info->GetPortion(), info).second);
+ SumSmall += info->BlobsBytes();
+ return true;
+ }
+ return false;
+ }
+
+ std::shared_ptr<TColumnEngineChanges> GetSmallPortionsMergeTask(const TCompactionLimits& limits, std::shared_ptr<TGranuleMeta> granule) const;
+
+protected:
+ virtual void DoAddPortion(const std::shared_ptr<TPortionInfo>& info) override {
+ AddSmallPortion(info);
+ AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("event", "add_portion")("portion_id", info->GetPortion());
+ auto itStart = Positions.find(info->IndexKeyStart());
+ if (itStart == Positions.end()) {
+ itStart = Positions.emplace(info->IndexKeyStart(), TBorderPositions(info->IndexKeyStart())).first;
+ if (itStart != Positions.begin()) {
+ auto itStartCopy = itStart;
+ --itStartCopy;
+ itStart->second.CopyFrom(itStartCopy->second);
+ AddRanged(itStart->second);
+ }
+ }
+ auto itEnd = Positions.find(info->IndexKeyEnd());
+ if (itEnd == Positions.end()) {
+ itEnd = Positions.emplace(info->IndexKeyEnd(), TBorderPositions(info->IndexKeyEnd())).first;
+ Y_VERIFY(itEnd != Positions.begin());
+ auto itEndCopy = itEnd;
+ --itEndCopy;
+ itEnd->second.CopyFrom(itEndCopy->second);
+ AddRanged(itEnd->second);
+ itStart = Positions.find(info->IndexKeyStart());
+ }
+ Y_VERIFY(itStart != Positions.end());
+ Y_VERIFY(itEnd != Positions.end());
+ itStart->second.AddStart(info);
+ itEnd->second.AddFinish(info);
+ for (auto it = itStart; it != itEnd; ++it) {
+ RemoveRanged(it->second);
+ it->second.AddSummary(info);
+ AddRanged(it->second);
+ }
+ }
+ virtual void DoRemovePortion(const std::shared_ptr<TPortionInfo>& info) override {
+ RemoveSmallPortion(info);
+ AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("event", "remove_portion")("portion_id", info->GetPortion());
+ auto itStart = Positions.find(info->IndexKeyStart());
+ auto itFinish = Positions.find(info->IndexKeyEnd());
+ Y_VERIFY(itStart != Positions.end());
+ Y_VERIFY(itFinish != Positions.end());
+ for (auto it = itStart; it != itFinish; ++it) {
+ RemoveRanged(it->second);
+ it->second.RemoveSummary(info);
+ AddRanged(it->second);
+ }
+ if (itStart->second.RemoveStart(info)) {
+ RemoveRanged(itStart->second);
+ Positions.erase(itStart);
+ }
+ if (itFinish->second.RemoveFinish(info)) {
+ RemoveRanged(itFinish->second);
+ Positions.erase(itFinish);
+ }
+ }
+ virtual std::shared_ptr<TColumnEngineChanges> DoGetOptimizationTask(const TCompactionLimits& limits, std::shared_ptr<TGranuleMeta> granule, const THashSet<TPortionAddress>& busyPortions) const override;
+ virtual i64 DoGetUsefulMetric() const override {
+ if (RangedSegments.empty()) {
+ return 0;
+ }
+ auto& topSegment = **RangedSegments.begin()->second.begin();
+ auto& topFeaturesTask = topSegment.GetFeatures();
+ return topFeaturesTask.GetUsefulMetric();
+ }
+public:
+ virtual TString GetDescription() const override {
+ NJson::TJsonValue result = NJson::JSON_MAP;
+ auto& positions = result.InsertValue("positions", NJson::JSON_ARRAY);
+ for (auto&& i : Positions) {
+ positions.AppendValue(i.second.DebugJson());
+ }
+ return result.GetStringRobust();
+ }
+
+ TIntervalsOptimizerPlanner(const ui64 granuleId)
+ : TBase(granuleId)
+ {
+
+ }
+};
+
+} // namespace NKikimr::NOlap
diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/optimizer.cpp b/ydb/core/tx/columnshard/engines/storage/optimizer/optimizer.cpp
new file mode 100644
index 0000000000..b9382c8f31
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/storage/optimizer/optimizer.cpp
@@ -0,0 +1,5 @@
+#include "optimizer.h"
+
+namespace NKikimr::NOlap::NStorageOptimizer {
+
+} // namespace NKikimr::NOlap
diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/optimizer.h b/ydb/core/tx/columnshard/engines/storage/optimizer/optimizer.h
new file mode 100644
index 0000000000..c9894047ed
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/storage/optimizer/optimizer.h
@@ -0,0 +1,55 @@
+#pragma once
+#include <ydb/core/tx/columnshard/engines/portions/portion_info.h>
+#include <ydb/core/tx/columnshard/engines/changes/compaction.h>
+#include <library/cpp/object_factory/object_factory.h>
+
+namespace NKikimr::NOlap {
+struct TCompactionLimits;
+class TGranuleMeta;
+}
+
+namespace NKikimr::NOlap::NStorageOptimizer {
+
+class IOptimizerPlanner {
+private:
+ const ui64 GranuleId;
+protected:
+ virtual void DoAddPortion(const std::shared_ptr<TPortionInfo>& info) = 0;
+ virtual void DoRemovePortion(const std::shared_ptr<TPortionInfo>& info) = 0;
+ virtual std::shared_ptr<TColumnEngineChanges> DoGetOptimizationTask(const TCompactionLimits& limits, std::shared_ptr<TGranuleMeta> granule, const THashSet<TPortionAddress>& busyPortions) const = 0;
+ virtual i64 DoGetUsefulMetric() const = 0;
+
+public:
+ using TFactory = NObjectFactory::TObjectFactory<IOptimizerPlanner, TString>;
+ IOptimizerPlanner(const ui64 granuleId)
+ : GranuleId(granuleId)
+ {
+
+ }
+
+
+ virtual ~IOptimizerPlanner() = default;
+ virtual TString GetDescription() const {
+ return "";
+ }
+
+ void AddPortion(const std::shared_ptr<TPortionInfo>& info) {
+ Y_VERIFY(info);
+ NActors::TLogContextGuard g(NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("granule_id", GranuleId));
+ DoAddPortion(info);
+ }
+ void RemovePortion(const std::shared_ptr<TPortionInfo>& info) {
+ Y_VERIFY(info);
+ NActors::TLogContextGuard g(NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("granule_id", GranuleId));
+ DoRemovePortion(info);
+ }
+ std::shared_ptr<TColumnEngineChanges> GetOptimizationTask(const TCompactionLimits& limits, std::shared_ptr<TGranuleMeta> granule, const THashSet<TPortionAddress>& busyPortions) const {
+ NActors::TLogContextGuard g(NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("granule_id", GranuleId));
+ return DoGetOptimizationTask(limits, granule, busyPortions);
+ }
+ i64 GetUsefulMetric() const {
+ return DoGetUsefulMetric();
+ }
+};
+
+} // namespace NKikimr::NOlap
diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/ut/ut_optimizer.cpp b/ydb/core/tx/columnshard/engines/storage/optimizer/ut/ut_optimizer.cpp
new file mode 100644
index 0000000000..1c21de2475
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/storage/optimizer/ut/ut_optimizer.cpp
@@ -0,0 +1,90 @@
+#include <library/cpp/testing/unittest/registar.h>
+#include <ydb/core/tx/columnshard/splitter/rb_splitter.h>
+#include <ydb/core/tx/columnshard/counters/indexation.h>
+#include <ydb/core/tx/columnshard/engines/storage/optimizer/intervals_optimizer.h>
+#include <ydb/core/formats/arrow/serializer/batch_only.h>
+#include <ydb/core/formats/arrow/simple_builder/batch.h>
+#include <ydb/core/formats/arrow/simple_builder/filler.h>
+#include <ydb/core/formats/arrow/serializer/full.h>
+#include <contrib/libs/apache/arrow/cpp/src/arrow/type.h>
+
+Y_UNIT_TEST_SUITE(StorageOptimizer) {
+
+ using namespace NKikimr::NArrow;
+
+ class TPortionsMaker {
+ private:
+ ui64 PortionId = 0;
+ public:
+
+ static TReplaceKey MakeKey(const i64 value) {
+ NConstruction::IArrayBuilder::TPtr column = std::make_shared<NConstruction::TSimpleArrayConstructor<NConstruction::TIntSeqFiller<arrow::Int64Type>>>(
+ "pk", NConstruction::TIntSeqFiller<arrow::Int64Type>(value));
+ return TReplaceKey({column->BuildArray(1)}, 0);
+ }
+
+ std::shared_ptr<NKikimr::NOlap::TPortionInfo> Make(const i64 pkStart, const i64 pkFinish, const i64 size) {
+ auto result = std::make_shared<NKikimr::NOlap::TPortionInfo>(0, ++PortionId, NKikimr::NOlap::TSnapshot(1, 1));
+
+ result->MutableMeta().IndexKeyStart = MakeKey(pkStart);
+ result->MutableMeta().IndexKeyEnd = MakeKey(pkFinish);
+
+ result->Records.emplace_back(NKikimr::NOlap::TColumnRecord::TTestInstanceBuilder::Build(1, 0, 0, size, 1, 1));
+ return result;
+ }
+ };
+
+ Y_UNIT_TEST(Empty) {
+ TPortionsMaker maker;
+ NKikimr::NOlap::NStorageOptimizer::TIntervalsOptimizerPlanner planner(0);
+ planner.AddPortion(maker.Make(1, 100, 10000));
+ Cerr << planner.GetDescription() << Endl;
+ NKikimr::NOlap::TCompactionLimits limits;
+ auto task = planner.GetOptimizationTask(limits, nullptr);
+ Y_VERIFY(!task);
+ }
+
+ Y_UNIT_TEST(MergeSmall) {
+ TPortionsMaker maker;
+ NKikimr::NOlap::NStorageOptimizer::TIntervalsOptimizerPlanner planner(0);
+ planner.AddPortion(maker.Make(1, 100, 10000));
+ planner.AddPortion(maker.Make(101, 200, 10000));
+ Cerr << planner.GetDescription() << Endl;
+ NKikimr::NOlap::TCompactionLimits limits;
+ auto task = dynamic_pointer_cast<NKikimr::NOlap::TCompactColumnEngineChanges>(planner.GetOptimizationTask(limits, nullptr));
+ Y_VERIFY(task);
+ Y_VERIFY(task->SwitchedPortions.size() == 2);
+ }
+
+ Y_UNIT_TEST(MergeSmall1) {
+ TPortionsMaker maker;
+ NKikimr::NOlap::NStorageOptimizer::TIntervalsOptimizerPlanner planner(0);
+ planner.AddPortion(maker.Make(1, 100, 10000));
+ planner.AddPortion(maker.Make(2, 101, 20000));
+ planner.AddPortion(maker.Make(10, 200, 30000));
+ planner.AddPortion(maker.Make(10, 20, 40000));
+ Cerr << planner.GetDescription() << Endl;
+ NKikimr::NOlap::TCompactionLimits limits;
+ auto task = dynamic_pointer_cast<NKikimr::NOlap::TCompactColumnEngineChanges>(planner.GetOptimizationTask(limits, nullptr));
+ Y_VERIFY(task);
+ Y_VERIFY(task->SwitchedPortions.size() == 4);
+ }
+
+ Y_UNIT_TEST(MergeSmall2) {
+ TPortionsMaker maker;
+ NKikimr::NOlap::NStorageOptimizer::TIntervalsOptimizerPlanner planner(0);
+ planner.AddPortion(maker.Make(1, 10000, 10000000));
+ planner.AddPortion(maker.Make(1, 100, 10000));
+ planner.AddPortion(maker.Make(2, 101, 20000));
+ planner.AddPortion(maker.Make(1000, 20000, 30000));
+ planner.AddPortion(maker.Make(1000, 2000, 40000));
+ Cerr << planner.GetDescription() << Endl;
+ NKikimr::NOlap::TCompactionLimits limits;
+ auto task = dynamic_pointer_cast<NKikimr::NOlap::TCompactColumnEngineChanges>(planner.GetOptimizationTask(limits, nullptr));
+ Y_VERIFY(task);
+ Y_VERIFY(task->SwitchedPortions.size() == 2);
+ Y_VERIFY(task->SwitchedPortions[0].GetPortion() == 1);
+ Y_VERIFY(task->SwitchedPortions[1].GetPortion() == 2);
+ }
+
+};
diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/ut/ya.make b/ydb/core/tx/columnshard/engines/storage/optimizer/ut/ya.make
new file mode 100644
index 0000000000..9b7fcea27f
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/storage/optimizer/ut/ya.make
@@ -0,0 +1,28 @@
+UNITTEST_FOR(ydb/core/tx/columnshard/engines/storage/optimizer)
+
+SIZE(SMALL)
+
+PEERDIR(
+ ydb/core/tx/columnshard/engines/changes
+ ydb/core/tx/columnshard/engines
+ ydb/core/tx/columnshard
+ ydb/library/yql/public/udf
+ ydb/core/formats/arrow/compression
+ ydb/core/grpc_services
+ ydb/core/scheme
+ ydb/core/ydb_convert
+ ydb/library/mkql_proto
+ ydb/core/tx/tx_proxy
+ ydb/library/mkql_proto
+ ydb/core/tx/schemeshard
+ ydb/library/yql/parser/pg_wrapper
+ ydb/core/persqueue
+ ydb/core/tx/time_cast
+ ydb/library/yql/sql/pg
+)
+
+SRCS(
+ ut_optimizer.cpp
+)
+
+END()
diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/ya.make b/ydb/core/tx/columnshard/engines/storage/optimizer/ya.make
new file mode 100644
index 0000000000..99e60e419e
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/storage/optimizer/ya.make
@@ -0,0 +1,18 @@
+LIBRARY()
+
+SRCS(
+ optimizer.cpp
+ intervals_optimizer.cpp
+)
+
+PEERDIR(
+ contrib/libs/apache/arrow
+ ydb/core/protos
+ ydb/core/formats/arrow
+)
+
+END()
+
+RECURSE_FOR_TESTS(
+ ut
+)
diff --git a/ydb/core/tx/columnshard/engines/storage/ya.make b/ydb/core/tx/columnshard/engines/storage/ya.make
index 9f9b74999f..811707b20d 100644
--- a/ydb/core/tx/columnshard/engines/storage/ya.make
+++ b/ydb/core/tx/columnshard/engines/storage/ya.make
@@ -8,6 +8,7 @@ SRCS(
PEERDIR(
contrib/libs/apache/arrow
ydb/core/protos
+ ydb/core/tx/columnshard/engines/storage/optimizer
ydb/core/formats/arrow
)