aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov <ivanmorozov@yandex-team.com>2023-07-23 10:00:51 +0300
committerivanmorozov <ivanmorozov@yandex-team.com>2023-07-23 10:00:51 +0300
commitb71a866bb25a4da400b09b0dc754094946a9f67d (patch)
tree035bff4eff806852ebaae3e70ebeec22e24c66a8
parentc402475a0a16ee3bb780ef7849a19bac8956e21f (diff)
downloadydb-b71a866bb25a4da400b09b0dc754094946a9f67d.tar.gz
KIKIMR-18796: final incapsulate service activity in CS
-rw-r--r--ydb/core/tx/columnshard/CMakeLists.darwin-x86_64.txt2
-rw-r--r--ydb/core/tx/columnshard/CMakeLists.linux-aarch64.txt2
-rw-r--r--ydb/core/tx/columnshard/CMakeLists.linux-x86_64.txt2
-rw-r--r--ydb/core/tx/columnshard/CMakeLists.windows-x86_64.txt2
-rw-r--r--ydb/core/tx/columnshard/columnshard_impl.cpp3
-rw-r--r--ydb/core/tx/columnshard/compaction_actor.cpp5
-rw-r--r--ydb/core/tx/columnshard/engines/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/core/tx/columnshard/engines/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/core/tx/columnshard/engines/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/core/tx/columnshard/engines/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/core/tx/columnshard/engines/changes/CMakeLists.darwin-x86_64.txt3
-rw-r--r--ydb/core/tx/columnshard/engines/changes/CMakeLists.linux-aarch64.txt3
-rw-r--r--ydb/core/tx/columnshard/engines/changes/CMakeLists.linux-x86_64.txt3
-rw-r--r--ydb/core/tx/columnshard/engines/changes/CMakeLists.windows-x86_64.txt3
-rw-r--r--ydb/core/tx/columnshard/engines/changes/abstract.cpp78
-rw-r--r--ydb/core/tx/columnshard/engines/changes/abstract.h106
-rw-r--r--ydb/core/tx/columnshard/engines/changes/cleanup.h8
-rw-r--r--ydb/core/tx/columnshard/engines/changes/in_granule_compaction.cpp68
-rw-r--r--ydb/core/tx/columnshard/engines/changes/in_granule_compaction.h20
-rw-r--r--ydb/core/tx/columnshard/engines/changes/indexation.cpp81
-rw-r--r--ydb/core/tx/columnshard/engines/changes/indexation.h4
-rw-r--r--ydb/core/tx/columnshard/engines/changes/mark_granules.cpp1
-rw-r--r--ydb/core/tx/columnshard/engines/changes/split_compaction.cpp391
-rw-r--r--ydb/core/tx/columnshard/engines/changes/split_compaction.h27
-rw-r--r--ydb/core/tx/columnshard/engines/changes/ttl.cpp74
-rw-r--r--ydb/core/tx/columnshard/engines/changes/ttl.h8
-rw-r--r--ydb/core/tx/columnshard/engines/changes/with_appended.cpp44
-rw-r--r--ydb/core/tx/columnshard/engines/changes/with_appended.h6
-rw-r--r--ydb/core/tx/columnshard/engines/changes/ya.make3
-rw-r--r--ydb/core/tx/columnshard/engines/column_engine_logs.cpp35
-rw-r--r--ydb/core/tx/columnshard/engines/column_engine_logs.h27
-rw-r--r--ydb/core/tx/columnshard/engines/index_logic_logs.cpp803
-rw-r--r--ydb/core/tx/columnshard/engines/index_logic_logs.h123
-rw-r--r--ydb/core/tx/columnshard/engines/indexed_read_data.cpp1
-rw-r--r--ydb/core/tx/columnshard/engines/portions/portion_info.cpp5
-rw-r--r--ydb/core/tx/columnshard/engines/ut_logs_engine.cpp14
-rw-r--r--ydb/core/tx/columnshard/engines/ut_program.cpp1
-rw-r--r--ydb/core/tx/columnshard/engines/ya.make1
-rw-r--r--ydb/core/tx/columnshard/eviction_actor.cpp6
-rw-r--r--ydb/core/tx/columnshard/indexing_actor.cpp5
-rw-r--r--ydb/core/tx/columnshard/splitter/CMakeLists.darwin-x86_64.txt20
-rw-r--r--ydb/core/tx/columnshard/splitter/CMakeLists.linux-aarch64.txt21
-rw-r--r--ydb/core/tx/columnshard/splitter/CMakeLists.linux-x86_64.txt21
-rw-r--r--ydb/core/tx/columnshard/splitter/CMakeLists.txt17
-rw-r--r--ydb/core/tx/columnshard/splitter/CMakeLists.windows-x86_64.txt20
-rw-r--r--ydb/core/tx/columnshard/splitter/splitter.cpp112
-rw-r--r--ydb/core/tx/columnshard/splitter/splitter.h32
-rw-r--r--ydb/core/tx/columnshard/splitter/ya.make13
-rw-r--r--ydb/core/tx/columnshard/tables_manager.cpp5
-rw-r--r--ydb/core/tx/columnshard/tables_manager.h7
-rw-r--r--ydb/core/tx/columnshard/ya.make1
51 files changed, 1201 insertions, 1040 deletions
diff --git a/ydb/core/tx/columnshard/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/columnshard/CMakeLists.darwin-x86_64.txt
index 85db168ab0f..07c0eaea85c 100644
--- a/ydb/core/tx/columnshard/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/tx/columnshard/CMakeLists.darwin-x86_64.txt
@@ -11,6 +11,7 @@ add_subdirectory(counters)
add_subdirectory(engines)
add_subdirectory(hooks)
add_subdirectory(resources)
+add_subdirectory(splitter)
add_subdirectory(ut_rw)
add_subdirectory(ut_schema)
get_built_tool_path(
@@ -48,6 +49,7 @@ target_link_libraries(core-tx-columnshard PUBLIC
columnshard-engines-writer
tx-columnshard-counters
tx-columnshard-common
+ tx-columnshard-splitter
core-tx-tiering
tx-conveyor-usage
tx-long_tx_service-public
diff --git a/ydb/core/tx/columnshard/CMakeLists.linux-aarch64.txt b/ydb/core/tx/columnshard/CMakeLists.linux-aarch64.txt
index 44e7122c497..14cdf344f20 100644
--- a/ydb/core/tx/columnshard/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/tx/columnshard/CMakeLists.linux-aarch64.txt
@@ -11,6 +11,7 @@ add_subdirectory(counters)
add_subdirectory(engines)
add_subdirectory(hooks)
add_subdirectory(resources)
+add_subdirectory(splitter)
add_subdirectory(ut_rw)
add_subdirectory(ut_schema)
get_built_tool_path(
@@ -49,6 +50,7 @@ target_link_libraries(core-tx-columnshard PUBLIC
columnshard-engines-writer
tx-columnshard-counters
tx-columnshard-common
+ tx-columnshard-splitter
core-tx-tiering
tx-conveyor-usage
tx-long_tx_service-public
diff --git a/ydb/core/tx/columnshard/CMakeLists.linux-x86_64.txt b/ydb/core/tx/columnshard/CMakeLists.linux-x86_64.txt
index 44e7122c497..14cdf344f20 100644
--- a/ydb/core/tx/columnshard/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/tx/columnshard/CMakeLists.linux-x86_64.txt
@@ -11,6 +11,7 @@ add_subdirectory(counters)
add_subdirectory(engines)
add_subdirectory(hooks)
add_subdirectory(resources)
+add_subdirectory(splitter)
add_subdirectory(ut_rw)
add_subdirectory(ut_schema)
get_built_tool_path(
@@ -49,6 +50,7 @@ target_link_libraries(core-tx-columnshard PUBLIC
columnshard-engines-writer
tx-columnshard-counters
tx-columnshard-common
+ tx-columnshard-splitter
core-tx-tiering
tx-conveyor-usage
tx-long_tx_service-public
diff --git a/ydb/core/tx/columnshard/CMakeLists.windows-x86_64.txt b/ydb/core/tx/columnshard/CMakeLists.windows-x86_64.txt
index 85db168ab0f..07c0eaea85c 100644
--- a/ydb/core/tx/columnshard/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/tx/columnshard/CMakeLists.windows-x86_64.txt
@@ -11,6 +11,7 @@ add_subdirectory(counters)
add_subdirectory(engines)
add_subdirectory(hooks)
add_subdirectory(resources)
+add_subdirectory(splitter)
add_subdirectory(ut_rw)
add_subdirectory(ut_schema)
get_built_tool_path(
@@ -48,6 +49,7 @@ target_link_libraries(core-tx-columnshard PUBLIC
columnshard-engines-writer
tx-columnshard-counters
tx-columnshard-common
+ tx-columnshard-splitter
core-tx-tiering
tx-conveyor-usage
tx-long_tx_service-public
diff --git a/ydb/core/tx/columnshard/columnshard_impl.cpp b/ydb/core/tx/columnshard/columnshard_impl.cpp
index e1a13775299..990415922de 100644
--- a/ydb/core/tx/columnshard/columnshard_impl.cpp
+++ b/ydb/core/tx/columnshard/columnshard_impl.cpp
@@ -838,7 +838,8 @@ std::unique_ptr<TEvPrivate::TEvWriteIndex> TColumnShard::SetupCleanup() {
NOlap::TSnapshot cleanupSnapshot{GetMinReadStep(), 0};
- auto changes = TablesManager.StartIndexCleanup(cleanupSnapshot, CompactionLimits.Get(), TLimits::MAX_TX_RECORDS);
+ auto changes =
+ TablesManager.MutablePrimaryIndex().StartCleanup(cleanupSnapshot, CompactionLimits.Get(), TablesManager.MutablePathsToDrop(), TLimits::MAX_TX_RECORDS);
if (!changes) {
LOG_S_INFO("Cannot prepare cleanup at tablet " << TabletID());
return {};
diff --git a/ydb/core/tx/columnshard/compaction_actor.cpp b/ydb/core/tx/columnshard/compaction_actor.cpp
index e9adfe87a5b..c771ce04c59 100644
--- a/ydb/core/tx/columnshard/compaction_actor.cpp
+++ b/ydb/core/tx/columnshard/compaction_actor.cpp
@@ -2,7 +2,6 @@
#include "blob_cache.h"
#include <ydb/core/tx/columnshard/engines/column_engine_logs.h>
-#include <ydb/core/tx/columnshard/engines/index_logic_logs.h>
#include <ydb/core/tx/conveyor/usage/events.h>
#include <ydb/core/tx/conveyor/usage/service.h>
@@ -135,8 +134,8 @@ private:
virtual bool DoExecute() override {
auto guard = TxEvent->PutResult->StartCpuGuard();
- NOlap::TCompactionLogic compactionLogic(TxEvent->IndexInfo, TxEvent->Tiering, Counters);
- TxEvent->Blobs = std::move(compactionLogic.Apply(TxEvent->IndexChanges).DetachResult());
+ NOlap::TConstructionContext context(TxEvent->IndexInfo, TxEvent->Tiering, Counters);
+ TxEvent->Blobs = std::move(TxEvent->IndexChanges->ConstructBlobs(context).DetachResult());
return true;
}
public:
diff --git a/ydb/core/tx/columnshard/engines/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/columnshard/engines/CMakeLists.darwin-x86_64.txt
index 8c5b9dab27a..94f29e68936 100644
--- a/ydb/core/tx/columnshard/engines/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/tx/columnshard/engines/CMakeLists.darwin-x86_64.txt
@@ -47,7 +47,6 @@ target_sources(tx-columnshard-engines PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/db_wrapper.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/index_info.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/indexed_read_data.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/index_logic_logs.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/filter.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/portion_info.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/tier_info.cpp
diff --git a/ydb/core/tx/columnshard/engines/CMakeLists.linux-aarch64.txt b/ydb/core/tx/columnshard/engines/CMakeLists.linux-aarch64.txt
index 7faf17091fd..08ceed7b245 100644
--- a/ydb/core/tx/columnshard/engines/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/tx/columnshard/engines/CMakeLists.linux-aarch64.txt
@@ -48,7 +48,6 @@ target_sources(tx-columnshard-engines PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/db_wrapper.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/index_info.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/indexed_read_data.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/index_logic_logs.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/filter.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/portion_info.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/tier_info.cpp
diff --git a/ydb/core/tx/columnshard/engines/CMakeLists.linux-x86_64.txt b/ydb/core/tx/columnshard/engines/CMakeLists.linux-x86_64.txt
index 7faf17091fd..08ceed7b245 100644
--- a/ydb/core/tx/columnshard/engines/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/tx/columnshard/engines/CMakeLists.linux-x86_64.txt
@@ -48,7 +48,6 @@ target_sources(tx-columnshard-engines PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/db_wrapper.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/index_info.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/indexed_read_data.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/index_logic_logs.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/filter.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/portion_info.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/tier_info.cpp
diff --git a/ydb/core/tx/columnshard/engines/CMakeLists.windows-x86_64.txt b/ydb/core/tx/columnshard/engines/CMakeLists.windows-x86_64.txt
index 8c5b9dab27a..94f29e68936 100644
--- a/ydb/core/tx/columnshard/engines/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/tx/columnshard/engines/CMakeLists.windows-x86_64.txt
@@ -47,7 +47,6 @@ target_sources(tx-columnshard-engines PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/db_wrapper.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/index_info.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/indexed_read_data.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/index_logic_logs.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/filter.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/portion_info.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/tier_info.cpp
diff --git a/ydb/core/tx/columnshard/engines/changes/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/columnshard/engines/changes/CMakeLists.darwin-x86_64.txt
index eae85c341ba..922cccb5001 100644
--- a/ydb/core/tx/columnshard/engines/changes/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/tx/columnshard/engines/changes/CMakeLists.darwin-x86_64.txt
@@ -20,6 +20,7 @@ target_link_libraries(columnshard-engines-changes PUBLIC
core-formats-arrow
tx-columnshard-common
columnshard-engines-insert_table
+ tx-columnshard-splitter
ydb-core-tablet_flat
core-tx-tiering
ydb-core-protos
@@ -29,6 +30,8 @@ target_sources(columnshard-engines-changes PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/changes/abstract.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/changes/mark.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/changes/compaction.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/changes/in_granule_compaction.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/changes/split_compaction.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/changes/ttl.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/changes/indexation.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/changes/cleanup.cpp
diff --git a/ydb/core/tx/columnshard/engines/changes/CMakeLists.linux-aarch64.txt b/ydb/core/tx/columnshard/engines/changes/CMakeLists.linux-aarch64.txt
index c7f073d7d3a..bf1a569276b 100644
--- a/ydb/core/tx/columnshard/engines/changes/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/tx/columnshard/engines/changes/CMakeLists.linux-aarch64.txt
@@ -21,6 +21,7 @@ target_link_libraries(columnshard-engines-changes PUBLIC
core-formats-arrow
tx-columnshard-common
columnshard-engines-insert_table
+ tx-columnshard-splitter
ydb-core-tablet_flat
core-tx-tiering
ydb-core-protos
@@ -30,6 +31,8 @@ target_sources(columnshard-engines-changes PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/changes/abstract.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/changes/mark.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/changes/compaction.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/changes/in_granule_compaction.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/changes/split_compaction.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/changes/ttl.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/changes/indexation.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/changes/cleanup.cpp
diff --git a/ydb/core/tx/columnshard/engines/changes/CMakeLists.linux-x86_64.txt b/ydb/core/tx/columnshard/engines/changes/CMakeLists.linux-x86_64.txt
index c7f073d7d3a..bf1a569276b 100644
--- a/ydb/core/tx/columnshard/engines/changes/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/tx/columnshard/engines/changes/CMakeLists.linux-x86_64.txt
@@ -21,6 +21,7 @@ target_link_libraries(columnshard-engines-changes PUBLIC
core-formats-arrow
tx-columnshard-common
columnshard-engines-insert_table
+ tx-columnshard-splitter
ydb-core-tablet_flat
core-tx-tiering
ydb-core-protos
@@ -30,6 +31,8 @@ target_sources(columnshard-engines-changes PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/changes/abstract.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/changes/mark.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/changes/compaction.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/changes/in_granule_compaction.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/changes/split_compaction.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/changes/ttl.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/changes/indexation.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/changes/cleanup.cpp
diff --git a/ydb/core/tx/columnshard/engines/changes/CMakeLists.windows-x86_64.txt b/ydb/core/tx/columnshard/engines/changes/CMakeLists.windows-x86_64.txt
index eae85c341ba..922cccb5001 100644
--- a/ydb/core/tx/columnshard/engines/changes/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/tx/columnshard/engines/changes/CMakeLists.windows-x86_64.txt
@@ -20,6 +20,7 @@ target_link_libraries(columnshard-engines-changes PUBLIC
core-formats-arrow
tx-columnshard-common
columnshard-engines-insert_table
+ tx-columnshard-splitter
ydb-core-tablet_flat
core-tx-tiering
ydb-core-protos
@@ -29,6 +30,8 @@ target_sources(columnshard-engines-changes PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/changes/abstract.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/changes/mark.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/changes/compaction.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/changes/in_granule_compaction.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/changes/split_compaction.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/changes/ttl.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/changes/indexation.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/changes/cleanup.cpp
diff --git a/ydb/core/tx/columnshard/engines/changes/abstract.cpp b/ydb/core/tx/columnshard/engines/changes/abstract.cpp
index 332b66977a4..323c59c07cb 100644
--- a/ydb/core/tx/columnshard/engines/changes/abstract.cpp
+++ b/ydb/core/tx/columnshard/engines/changes/abstract.cpp
@@ -11,6 +11,84 @@ TString TColumnEngineChanges::DebugString() const {
return sb.Str();
}
+NKikimr::TConclusion<std::vector<TString>> TColumnEngineChanges::ConstructBlobs(TConstructionContext& context) {
+ Y_VERIFY(Stage == EStage::Started);
+
+ {
+ ui64 readBytes = 0;
+ for (auto&& i : Blobs) {
+ readBytes += i.first.Size;
+ }
+ context.Counters.CompactionInputSize(readBytes);
+ }
+ const TMonotonic start = TMonotonic::Now();
+ TConclusion<std::vector<TString>> result = DoConstructBlobs(context);
+ if (result.IsSuccess()) {
+ context.Counters.CompactionDuration->Collect((TMonotonic::Now() - start).MilliSeconds());
+ } else {
+ context.Counters.CompactionFails->Add(1);
+ }
+ Stage = EStage::Constructed;
+ return result;
+}
+
+bool TColumnEngineChanges::ApplyChanges(TColumnEngineForLogs& self, TApplyChangesContext& context, const bool dryRun) {
+ Y_VERIFY(Stage != EStage::Aborted);
+ if ((ui32)Stage >= (ui32)EStage::Applied) {
+ return true;
+ }
+ Y_VERIFY(Stage == EStage::Compiled);
+
+ if (!DoApplyChanges(self, context, dryRun)) {
+ if (dryRun) {
+ OnChangesApplyFailed("problems on apply");
+ }
+ Y_VERIFY(dryRun);
+ return false;
+ } else if (!dryRun) {
+ OnChangesApplyFinished();
+ Stage = EStage::Applied;
+ }
+ return true;
+}
+
+void TColumnEngineChanges::WriteIndex(NColumnShard::TColumnShard& self, TWriteIndexContext& context) {
+ Y_VERIFY(Stage != EStage::Aborted);
+ if ((ui32)Stage >= (ui32)EStage::Written) {
+ return;
+ }
+ Y_VERIFY(Stage == EStage::Applied);
+
+ DoWriteIndex(self, context);
+ Stage = EStage::Written;
+}
+
+void TColumnEngineChanges::WriteIndexComplete(NColumnShard::TColumnShard& self, TWriteIndexCompleteContext& context) {
+ Y_VERIFY(Stage == EStage::Aborted || Stage == EStage::Written);
+ if (Stage == EStage::Aborted) {
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "WriteIndexComplete")("stage", Stage);
+ return;
+ }
+ if (Stage == EStage::Written) {
+ Stage = EStage::Finished;
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "WriteIndexComplete")("type", TypeString())("success", context.FinishedSuccessfully);
+ DoWriteIndexComplete(self, context);
+ DoOnFinish(self, context);
+ }
+}
+
+void TColumnEngineChanges::Compile(TFinalizationContext& context) noexcept {
+ Y_VERIFY(Stage != EStage::Aborted);
+ if ((ui32)Stage >= (ui32)EStage::Compiled) {
+ return;
+ }
+ Y_VERIFY(Stage == EStage::Constructed);
+
+ DoCompile(context);
+
+ Stage = EStage::Compiled;
+}
+
TWriteIndexContext::TWriteIndexContext(NTabletFlatExecutor::TTransactionContext& txc, IDbWrapper& dbWrapper)
: Txc(txc)
, BlobManagerDb(std::make_shared<NColumnShard::TBlobManagerDb>(txc.DB))
diff --git a/ydb/core/tx/columnshard/engines/changes/abstract.h b/ydb/core/tx/columnshard/engines/changes/abstract.h
index 49e4c967046..618aab096c6 100644
--- a/ydb/core/tx/columnshard/engines/changes/abstract.h
+++ b/ydb/core/tx/columnshard/engines/changes/abstract.h
@@ -23,6 +23,7 @@ class TBackgroundActivity;
namespace NKikimr::NOlap {
class TColumnEngineForLogs;
+class TVersionedIndex;
struct TCompactionLimits {
static constexpr const ui64 MIN_GOOD_BLOB_SIZE = 256 * 1024; // some BlobStorage constant
@@ -134,6 +135,37 @@ public:
}
};
+class TConstructionContext: TNonCopyable {
+public:
+ using TTieringsHash = THashMap<ui64, NKikimr::NOlap::TTiering>;
+private:
+ const TTieringsHash* TieringMap = nullptr;
+public:
+ const TVersionedIndex& SchemaVersions;
+ const NColumnShard::TIndexationCounters Counters;
+
+ TConstructionContext(const TVersionedIndex& schemaVersions, const TTieringsHash& tieringMap, const NColumnShard::TIndexationCounters counters)
+ : TieringMap(&tieringMap)
+ , SchemaVersions(schemaVersions)
+ , Counters(counters)
+ {
+
+ }
+
+ TConstructionContext(const TVersionedIndex& schemaVersions, const NColumnShard::TIndexationCounters counters)
+ : SchemaVersions(schemaVersions)
+ , Counters(counters) {
+
+ }
+
+ const THashMap<ui64, NKikimr::NOlap::TTiering>& GetTieringMap() const {
+ if (TieringMap) {
+ return *TieringMap;
+ }
+ return Default<THashMap<ui64, NKikimr::NOlap::TTiering>>();
+ }
+};
+
class TGranuleMeta;
class TColumnEngineChanges {
@@ -141,6 +173,7 @@ public:
enum class EStage: ui32 {
Created = 0,
Started,
+ Constructed,
Compiled,
Applied,
Written,
@@ -163,8 +196,15 @@ protected:
virtual void DoAbort() {
}
+ virtual bool NeedConstruction() const {
+ return true;
+ }
virtual void DoStart(NColumnShard::TColumnShard& self) = 0;
+ virtual TConclusion<std::vector<TString>> DoConstructBlobs(TConstructionContext& context) noexcept = 0;
+
public:
+ TConclusion<std::vector<TString>> ConstructBlobs(TConstructionContext& context);
+
void AbortEmergency() {
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "AbortEmergency");
Stage = EStage::Aborted;
@@ -178,80 +218,37 @@ public:
}
virtual ~TColumnEngineChanges() {
- Y_VERIFY(Stage == EStage::Created || Stage == EStage::Finished || Stage == EStage::Aborted);
+ Y_VERIFY_DEBUG(Stage == EStage::Created || Stage == EStage::Finished || Stage == EStage::Aborted);
}
void Start(NColumnShard::TColumnShard& self) {
Y_VERIFY(Stage == EStage::Created);
DoStart(self);
Stage = EStage::Started;
+ if (!NeedConstruction()) {
+ Stage = EStage::Constructed;
+ }
}
void StartEmergency() {
Y_VERIFY(Stage == EStage::Created);
Stage = EStage::Started;
- }
-
- bool ApplyChanges(TColumnEngineForLogs& self, TApplyChangesContext& context, const bool dryRun) {
- Y_VERIFY(Stage != EStage::Aborted);
- if ((ui32)Stage >= (ui32)EStage::Applied) {
- return true;
+ if (!NeedConstruction()) {
+ Stage = EStage::Constructed;
}
- Y_VERIFY(Stage == EStage::Compiled);
-
- if (!DoApplyChanges(self, context, dryRun)) {
- if (dryRun) {
- OnChangesApplyFailed("problems on apply");
- }
- Y_VERIFY(dryRun);
- return false;
- } else if (!dryRun) {
- OnChangesApplyFinished();
- Stage = EStage::Applied;
- }
- return true;
}
+ bool ApplyChanges(TColumnEngineForLogs& self, TApplyChangesContext& context, const bool dryRun);
+
virtual ui32 GetWritePortionsCount() const = 0;
virtual const TPortionInfo& GetWritePortionInfo(const ui32 index) const = 0;
virtual bool NeedWritePortion(const ui32 index) const = 0;
virtual void UpdateWritePortionInfo(const ui32 index, const TPortionInfo& info) = 0;
- void WriteIndex(NColumnShard::TColumnShard& self, TWriteIndexContext& context) {
- Y_VERIFY(Stage != EStage::Aborted);
- if ((ui32)Stage >= (ui32)EStage::Written) {
- return;
- }
- Y_VERIFY(Stage == EStage::Applied);
-
- DoWriteIndex(self, context);
- Stage = EStage::Written;
- }
- void WriteIndexComplete(NColumnShard::TColumnShard& self, TWriteIndexCompleteContext& context) {
- Y_VERIFY(Stage == EStage::Aborted || Stage == EStage::Written);
- if (Stage == EStage::Aborted) {
- AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "WriteIndexComplete")("stage", Stage);
- return;
- }
- if (Stage == EStage::Written) {
- Stage = EStage::Finished;
- AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "WriteIndexComplete")("type", TypeString())("success", context.FinishedSuccessfully);
- DoWriteIndexComplete(self, context);
- DoOnFinish(self, context);
- }
- }
+ void WriteIndex(NColumnShard::TColumnShard& self, TWriteIndexContext& context);
+ void WriteIndexComplete(NColumnShard::TColumnShard& self, TWriteIndexCompleteContext& context);
- void Compile(TFinalizationContext& context) noexcept {
- Y_VERIFY(Stage != EStage::Aborted);
- if ((ui32)Stage >= (ui32)EStage::Compiled) {
- return;
- }
- Y_VERIFY(Stage == EStage::Started);
-
- DoCompile(context);
-
- Stage = EStage::Compiled;
- }
+ void Compile(TFinalizationContext& context) noexcept;
void SetBlobs(THashMap<TBlobRange, TString>&& blobs) {
Y_VERIFY(!blobs.empty());
@@ -260,7 +257,6 @@ public:
TSnapshot InitSnapshot = TSnapshot::Zero();
THashMap<TBlobRange, TString> Blobs;
- bool NeedRepeat{false};
virtual THashMap<TUnifiedBlobId, std::vector<TBlobRange>> GetGroupedBlobRanges() const = 0;
virtual TString TypeString() const = 0;
diff --git a/ydb/core/tx/columnshard/engines/changes/cleanup.h b/ydb/core/tx/columnshard/engines/changes/cleanup.h
index 70e349ba186..33c971b6903 100644
--- a/ydb/core/tx/columnshard/engines/changes/cleanup.h
+++ b/ydb/core/tx/columnshard/engines/changes/cleanup.h
@@ -15,8 +15,16 @@ protected:
virtual void DoWriteIndex(NColumnShard::TColumnShard& self, TWriteIndexContext& context) override;
virtual void DoCompile(TFinalizationContext& /*context*/) override {
}
+ virtual NKikimr::TConclusion<std::vector<TString>> DoConstructBlobs(TConstructionContext& /*context*/) noexcept override {
+ return std::vector<TString>();
+ }
+ virtual bool NeedConstruction() const override {
+ return false;
+ }
public:
std::vector<TPortionInfo> PortionsToDrop;
+ bool NeedRepeat = false;
+
virtual THashMap<TUnifiedBlobId, std::vector<TBlobRange>> GetGroupedBlobRanges() const override {
return {};
}
diff --git a/ydb/core/tx/columnshard/engines/changes/in_granule_compaction.cpp b/ydb/core/tx/columnshard/engines/changes/in_granule_compaction.cpp
new file mode 100644
index 00000000000..d50e17de65a
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/changes/in_granule_compaction.cpp
@@ -0,0 +1,68 @@
+#include "in_granule_compaction.h"
+
+namespace NKikimr::NOlap {
+
+std::pair<std::shared_ptr<arrow::RecordBatch>, TSnapshot> TInGranuleCompactColumnEngineChanges::CompactInOneGranule(ui64 granule,
+ const std::vector<TPortionInfo>& portions,
+ const THashMap<TBlobRange, TString>& blobs, TConstructionContext& context) const {
+ std::vector<std::shared_ptr<arrow::RecordBatch>> batches;
+ batches.reserve(portions.size());
+
+ auto resultSchema = context.SchemaVersions.GetLastSchema();
+
+ TSnapshot maxSnapshot = resultSchema->GetSnapshot();
+ for (auto& portionInfo : portions) {
+ Y_VERIFY(!portionInfo.Empty());
+ Y_VERIFY(portionInfo.Granule() == granule);
+ auto blobSchema = context.SchemaVersions.GetSchema(portionInfo.GetSnapshot());
+ auto batch = portionInfo.AssembleInBatch(*blobSchema, *resultSchema, blobs);
+ batches.push_back(batch);
+ if (portionInfo.GetSnapshot() > maxSnapshot) {
+ maxSnapshot = portionInfo.GetSnapshot();
+ }
+ }
+
+ auto sortedBatch = NArrow::CombineSortedBatches(batches, resultSchema->GetIndexInfo().SortReplaceDescription());
+ Y_VERIFY_DEBUG(NArrow::IsSortedAndUnique(sortedBatch, resultSchema->GetIndexInfo().GetReplaceKey()));
+
+ return std::make_pair(sortedBatch, maxSnapshot);
+}
+
+TConclusion<std::vector<TString>> TInGranuleCompactColumnEngineChanges::DoConstructBlobs(TConstructionContext& context) noexcept {
+ const ui64 pathId = SrcGranule->PathId;
+ std::vector<TString> blobs;
+ auto& switchedPortions = SwitchedPortions;
+ Y_VERIFY(switchedPortions.size());
+
+ ui64 granule = switchedPortions[0].Granule();
+ auto [batch, maxSnapshot] = CompactInOneGranule(granule, switchedPortions, Blobs, context);
+
+ auto resultSchema = context.SchemaVersions.GetLastSchema();
+ std::vector<TPortionInfo> portions;
+ if (!MergeBorders.Empty()) {
+ Y_VERIFY(MergeBorders.GetOrderedMarks().size() > 1);
+ auto slices = MergeBorders.SliceIntoGranules(batch, resultSchema->GetIndexInfo());
+ portions.reserve(slices.size());
+
+ for (auto& [_, slice] : slices) {
+ if (!slice || slice->num_rows() == 0) {
+ continue;
+ }
+ auto tmp = MakeAppendedPortions(pathId, slice, granule, maxSnapshot, blobs, GetGranuleMeta(), context);
+ for (auto&& portionInfo : tmp) {
+ portions.emplace_back(std::move(portionInfo));
+ }
+ }
+ } else {
+ portions = MakeAppendedPortions(pathId, batch, granule, maxSnapshot, blobs, GetGranuleMeta(), context);
+ }
+
+ Y_VERIFY(portions.size() > 0);
+ Y_VERIFY(AppendedPortions.empty());
+ // Set appended portions.
+ AppendedPortions.swap(portions);
+
+ return blobs;
+}
+
+}
diff --git a/ydb/core/tx/columnshard/engines/changes/in_granule_compaction.h b/ydb/core/tx/columnshard/engines/changes/in_granule_compaction.h
new file mode 100644
index 00000000000..5a1cba91c9f
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/changes/in_granule_compaction.h
@@ -0,0 +1,20 @@
+#pragma once
+#include "compaction.h"
+
+namespace NKikimr::NOlap {
+
+class TGranuleMeta;
+
+class TInGranuleCompactColumnEngineChanges: public TCompactColumnEngineChanges {
+private:
+ using TBase = TCompactColumnEngineChanges;
+ std::pair<std::shared_ptr<arrow::RecordBatch>, TSnapshot> CompactInOneGranule(ui64 granule,
+ const std::vector<TPortionInfo>& portions, const THashMap<TBlobRange, TString>& blobs,
+ TConstructionContext& context) const;
+protected:
+ virtual TConclusion<std::vector<TString>> DoConstructBlobs(TConstructionContext& context) noexcept override;
+public:
+ using TBase::TBase;
+};
+
+}
diff --git a/ydb/core/tx/columnshard/engines/changes/indexation.cpp b/ydb/core/tx/columnshard/engines/changes/indexation.cpp
index 2e45057cf8f..83b6f562196 100644
--- a/ydb/core/tx/columnshard/engines/changes/indexation.cpp
+++ b/ydb/core/tx/columnshard/engines/changes/indexation.cpp
@@ -69,4 +69,85 @@ void TInsertColumnEngineChanges::DoOnFinish(NColumnShard::TColumnShard& self, TC
self.BackgroundController.FinishIndexing();
}
+NKikimr::TConclusion<std::vector<TString>> TInsertColumnEngineChanges::DoConstructBlobs(TConstructionContext& context) noexcept {
+ Y_VERIFY(!DataToIndex.empty());
+ Y_VERIFY(AppendedPortions.empty());
+
+ auto maxSnapshot = TSnapshot::Zero();
+ for (auto& inserted : DataToIndex) {
+ TSnapshot insertSnap = inserted.GetSnapshot();
+ Y_VERIFY(insertSnap.Valid());
+ if (insertSnap > maxSnapshot) {
+ maxSnapshot = insertSnap;
+ }
+ }
+ Y_VERIFY(maxSnapshot.Valid());
+
+ auto resultSchema = context.SchemaVersions.GetSchema(maxSnapshot);
+ Y_VERIFY(resultSchema->GetIndexInfo().IsSorted());
+
+ THashMap<ui64, std::vector<std::shared_ptr<arrow::RecordBatch>>> pathBatches;
+ for (auto& inserted : DataToIndex) {
+ TBlobRange blobRange(inserted.BlobId, 0, inserted.BlobId.BlobSize());
+
+ auto blobSchema = context.SchemaVersions.GetSchema(inserted.GetSchemaSnapshot());
+ auto& indexInfo = blobSchema->GetIndexInfo();
+ Y_VERIFY(indexInfo.IsSorted());
+
+ std::shared_ptr<arrow::RecordBatch> batch;
+ if (auto it = CachedBlobs.find(inserted.BlobId); it != CachedBlobs.end()) {
+ batch = it->second;
+ } else if (auto* blobData = Blobs.FindPtr(blobRange)) {
+ Y_VERIFY(!blobData->empty(), "Blob data not present");
+ // Prepare batch
+ batch = NArrow::DeserializeBatch(*blobData, indexInfo.ArrowSchema());
+ if (!batch) {
+ AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)
+ ("event", "cannot_parse")
+ ("data_snapshot", TStringBuilder() << inserted.GetSnapshot())
+ ("index_snapshot", TStringBuilder() << blobSchema->GetSnapshot());
+ }
+ } else {
+ Y_VERIFY(blobData, "Data for range %s has not been read", blobRange.ToString().c_str());
+ }
+ Y_VERIFY(batch);
+
+ batch = AddSpecials(batch, blobSchema->GetIndexInfo(), inserted);
+ batch = resultSchema->NormalizeBatch(*blobSchema, batch);
+ pathBatches[inserted.PathId].push_back(batch);
+ Y_VERIFY_DEBUG(NArrow::IsSorted(pathBatches[inserted.PathId].back(), resultSchema->GetIndexInfo().GetReplaceKey()));
+ }
+ std::vector<TString> blobs;
+
+ for (auto& [pathId, batches] : pathBatches) {
+ AddPathIfNotExists(pathId);
+
+ // We could merge data here cause tablet limits indexing data portions
+ auto merged = NArrow::CombineSortedBatches(batches, resultSchema->GetIndexInfo().SortReplaceDescription());
+ Y_VERIFY(merged);
+ Y_VERIFY_DEBUG(NArrow::IsSortedAndUnique(merged, resultSchema->GetIndexInfo().GetReplaceKey()));
+
+ auto granuleBatches = TMarksGranules::SliceIntoGranules(merged, PathToGranule[pathId], resultSchema->GetIndexInfo());
+ for (auto& [granule, batch] : granuleBatches) {
+ auto portions = MakeAppendedPortions(pathId, batch, granule, maxSnapshot, blobs, GetGranuleMeta(), context);
+ Y_VERIFY(portions.size() > 0);
+ for (auto& portion : portions) {
+ AppendedPortions.emplace_back(std::move(portion));
+ }
+ }
+ }
+
+ Y_VERIFY(PathToGranule.size() == pathBatches.size());
+ return blobs;
+}
+
+std::shared_ptr<arrow::RecordBatch> TInsertColumnEngineChanges::AddSpecials(const std::shared_ptr<arrow::RecordBatch>& srcBatch,
+ const TIndexInfo& indexInfo, const TInsertedData& inserted) const
+{
+ auto batch = TIndexInfo::AddSpecialColumns(srcBatch, inserted.GetSnapshot());
+ Y_VERIFY(batch);
+
+ return NArrow::ExtractColumns(batch, indexInfo.ArrowSchemaWithSpecials());
+}
+
}
diff --git a/ydb/core/tx/columnshard/engines/changes/indexation.h b/ydb/core/tx/columnshard/engines/changes/indexation.h
index 852583c730d..77fc125dd08 100644
--- a/ydb/core/tx/columnshard/engines/changes/indexation.h
+++ b/ydb/core/tx/columnshard/engines/changes/indexation.h
@@ -9,12 +9,16 @@ namespace NKikimr::NOlap {
class TInsertColumnEngineChanges: public TChangesWithAppend {
private:
using TBase = TChangesWithAppend;
+ std::shared_ptr<arrow::RecordBatch> AddSpecials(const std::shared_ptr<arrow::RecordBatch>& srcBatch,
+ const TIndexInfo& indexInfo, const TInsertedData& inserted) const;
+
protected:
virtual void DoStart(NColumnShard::TColumnShard& self) override;
virtual void DoWriteIndexComplete(NColumnShard::TColumnShard& self, TWriteIndexCompleteContext& context) override;
virtual bool DoApplyChanges(TColumnEngineForLogs& self, TApplyChangesContext& context, const bool dryRun) override;
virtual void DoOnFinish(NColumnShard::TColumnShard& self, TChangesFinishContext& context) override;
virtual void DoWriteIndex(NColumnShard::TColumnShard& self, TWriteIndexContext& context) override;
+ virtual TConclusion<std::vector<TString>> DoConstructBlobs(TConstructionContext& context) noexcept override;
public:
const TMark DefaultMark;
THashMap<ui64, std::vector<std::pair<TMark, ui64>>> PathToGranule; // pathId -> {mark, granule}
diff --git a/ydb/core/tx/columnshard/engines/changes/mark_granules.cpp b/ydb/core/tx/columnshard/engines/changes/mark_granules.cpp
index 5530e2bfa75..57ede5644a6 100644
--- a/ydb/core/tx/columnshard/engines/changes/mark_granules.cpp
+++ b/ydb/core/tx/columnshard/engines/changes/mark_granules.cpp
@@ -1,5 +1,4 @@
#include "mark_granules.h"
-#include <ydb/core/tx/columnshard/engines/index_logic_logs.h>
namespace NKikimr::NOlap {
TMarksGranules::TMarksGranules(std::vector<TPair>&& marks) noexcept
diff --git a/ydb/core/tx/columnshard/engines/changes/split_compaction.cpp b/ydb/core/tx/columnshard/engines/changes/split_compaction.cpp
new file mode 100644
index 00000000000..cf19d14241b
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/changes/split_compaction.cpp
@@ -0,0 +1,391 @@
+#include "split_compaction.h"
+
+namespace NKikimr::NOlap {
+
+TConclusion<std::vector<TString>> TSplitCompactColumnEngineChanges::DoConstructBlobs(TConstructionContext& context) noexcept {
+ const ui64 pathId = SrcGranule->PathId;
+ const TMark ts0 = SrcGranule->Mark;
+ std::vector<TPortionInfo>& portions = SwitchedPortions;
+
+ std::vector<std::pair<TMark, ui64>> tsIds;
+ ui64 movedRows = TryMovePortions(ts0, portions, tsIds, PortionsToMove, context);
+ auto [srcBatches, maxSnapshot] = PortionsToBatches(portions, Blobs, movedRows != 0, context);
+ Y_VERIFY(srcBatches.size() == portions.size());
+
+ std::vector<TString> blobs;
+ auto resultSchema = context.SchemaVersions.GetLastSchema();
+ if (movedRows) {
+ Y_VERIFY(PortionsToMove.size() >= 2);
+ Y_VERIFY(PortionsToMove.size() == tsIds.size());
+ Y_VERIFY(tsIds.begin()->first == ts0);
+
+ // Calculate total number of rows.
+ ui64 numRows = movedRows;
+ for (const auto& batch : srcBatches) {
+ numRows += batch->num_rows();
+ }
+
+ // Recalculate new granules borders (if they are larger then portions)
+ ui32 numSplitInto = NumSplitInto(numRows);
+ if (numSplitInto < tsIds.size()) {
+ const ui32 rowsInGranule = numRows / numSplitInto;
+ Y_VERIFY(rowsInGranule);
+
+ std::vector<std::pair<TMark, ui64>> newTsIds;
+ ui32 tmpGranule = 0;
+ ui32 sumRows = 0;
+ // Always insert mark of the source granule at the beginning.
+ newTsIds.emplace_back(ts0, 1);
+
+ for (size_t i = 0, end = tsIds.size(); i != end; ++i) {
+ const TMark& ts = tsIds[i].first;
+ // Make new granule if the current number of rows is exceeded the allowed number of rows in the granule
+ // or there is the end of the ids and nothing was inserted so far.
+ if (sumRows >= rowsInGranule || (i + 1 == end && newTsIds.size() == 1)) {
+ ++tmpGranule;
+ newTsIds.emplace_back(ts, tmpGranule + 1);
+ sumRows = 0;
+ }
+
+ auto& toMove = PortionsToMove[i];
+ sumRows += toMove.first.NumRows();
+ toMove.second = tmpGranule;
+ }
+
+ tsIds.swap(newTsIds);
+ }
+ Y_VERIFY(tsIds.size() > 1);
+ Y_VERIFY(tsIds[0] == std::make_pair(ts0, ui64(1)));
+ TMarksGranules marksGranules(std::move(tsIds));
+
+ // Slice inserted portions with granules' borders
+ THashMap<ui64, std::vector<std::shared_ptr<arrow::RecordBatch>>> idBatches;
+ std::vector<TPortionInfo*> toSwitch;
+ toSwitch.reserve(portions.size());
+ for (size_t i = 0; i < portions.size(); ++i) {
+ auto& portion = portions[i];
+ auto& batch = srcBatches[i];
+ auto slices = marksGranules.SliceIntoGranules(batch, resultSchema->GetIndexInfo());
+
+ THashSet<ui64> ids;
+ for (auto& [id, slice] : slices) {
+ if (slice && slice->num_rows()) {
+ ids.insert(id);
+ idBatches[id].emplace_back(std::move(slice));
+ }
+ }
+
+ // Optimization: move not splitted inserted portions. Do not reappend them.
+ if (ids.size() == 1) {
+ ui64 id = *ids.begin();
+ idBatches[id].resize(idBatches[id].size() - 1);
+ ui64 tmpGranule = id - 1;
+ PortionsToMove.emplace_back(std::move(portion), tmpGranule);
+ } else {
+ toSwitch.push_back(&portion);
+ }
+ }
+
+ // Update switchedPortions if we have moves
+ if (toSwitch.size() != portions.size()) {
+ std::vector<TPortionInfo> tmp;
+ tmp.reserve(toSwitch.size());
+ for (auto* portionInfo : toSwitch) {
+ tmp.emplace_back(std::move(*portionInfo));
+ }
+ portions.swap(tmp);
+ }
+
+ for (const auto& [mark, id] : marksGranules.GetOrderedMarks()) {
+ ui64 tmpGranule = SetTmpGranule(pathId, mark);
+ for (const auto& batch : idBatches[id]) {
+ // Cannot set snapshot here. It would be set in committing transaction in ApplyChanges().
+ auto newPortions = MakeAppendedPortions(pathId, batch, tmpGranule, maxSnapshot, blobs, GetGranuleMeta(), context);
+ Y_VERIFY(newPortions.size() > 0);
+ for (auto& portion : newPortions) {
+ AppendedPortions.emplace_back(std::move(portion));
+ }
+ }
+ }
+ } else {
+ auto batches = SliceGranuleBatches(resultSchema->GetIndexInfo(), srcBatches, ts0);
+
+ SetTmpGranule(pathId, ts0);
+ for (auto& [ts, batch] : batches) {
+ // Tmp granule would be updated to correct value in ApplyChanges()
+ ui64 tmpGranule = SetTmpGranule(pathId, ts);
+
+ // Cannot set snapshot here. It would be set in committing transaction in ApplyChanges().
+ auto portions = MakeAppendedPortions(pathId, batch, tmpGranule, maxSnapshot, blobs, GetGranuleMeta(), context);
+ Y_VERIFY(portions.size() > 0);
+ for (auto& portion : portions) {
+ AppendedPortions.emplace_back(std::move(portion));
+ }
+ }
+ }
+
+ return blobs;
+}
+
+std::pair<std::vector<std::shared_ptr<arrow::RecordBatch>>, NKikimr::NOlap::TSnapshot> TSplitCompactColumnEngineChanges::PortionsToBatches(
+ const std::vector<TPortionInfo>& portions, const THashMap<TBlobRange, TString>& blobs, const bool insertedOnly, TConstructionContext& context) {
+ std::vector<std::shared_ptr<arrow::RecordBatch>> batches;
+ batches.reserve(portions.size());
+ const auto resultSchema = context.SchemaVersions.GetLastSchema();
+ TSnapshot maxSnapshot = resultSchema->GetSnapshot();
+ for (const auto& portionInfo : portions) {
+ if (!insertedOnly || portionInfo.IsInserted()) {
+ const auto blobSchema = context.SchemaVersions.GetSchema(portionInfo.GetSnapshot());
+
+ batches.push_back(portionInfo.AssembleInBatch(*blobSchema, *resultSchema, blobs));
+
+ if (maxSnapshot < portionInfo.GetSnapshot()) {
+ maxSnapshot = portionInfo.GetSnapshot();
+ }
+ }
+ }
+ return std::make_pair(std::move(batches), maxSnapshot);
+}
+
+std::vector<std::pair<NKikimr::NOlap::TMark, std::shared_ptr<arrow::RecordBatch>>> TSplitCompactColumnEngineChanges::SliceGranuleBatches(const TIndexInfo& indexInfo,
+ const std::vector<std::shared_ptr<arrow::RecordBatch>>& batches, const TMark& ts0) {
+ std::vector<std::pair<TMark, std::shared_ptr<arrow::RecordBatch>>> out;
+
+ // Extract unique effective keys and their counts
+ i64 numRows = 0;
+ TMap<NArrow::TReplaceKey, ui32> uniqKeyCount;
+ for (const auto& batch : batches) {
+ Y_VERIFY(batch);
+ if (batch->num_rows() == 0) {
+ continue;
+ }
+
+ numRows += batch->num_rows();
+
+ const auto effKey = TMarksGranules::GetEffectiveKey(batch, indexInfo);
+ Y_VERIFY(effKey->num_columns() && effKey->num_rows());
+
+ auto effColumns = std::make_shared<NArrow::TArrayVec>(effKey->columns());
+ for (int row = 0; row < effKey->num_rows(); ++row) {
+ ++uniqKeyCount[NArrow::TReplaceKey(effColumns, row)];
+ }
+ }
+
+ Y_VERIFY(uniqKeyCount.size());
+ auto minTs = uniqKeyCount.begin()->first;
+ auto maxTs = uniqKeyCount.rbegin()->first;
+ Y_VERIFY(minTs >= ts0.GetBorder());
+
+ // It's an estimation of needed count cause numRows calculated before key replaces
+ ui32 rowsInGranule = numRows / NumSplitInto(numRows);
+ Y_VERIFY(rowsInGranule);
+
+ // Cannot split in case of one unique key
+ if (uniqKeyCount.size() == 1) {
+ // We have to split big batch of same key in several portions
+ auto merged = NArrow::MergeSortedBatches(batches, indexInfo.SortReplaceDescription(), rowsInGranule);
+ for (auto& batch : merged) {
+ Y_VERIFY_DEBUG(NArrow::IsSortedAndUnique(batch, indexInfo.GetReplaceKey()));
+ out.emplace_back(ts0, batch);
+ }
+ return out;
+ }
+
+ // Make split borders from uniq keys
+ std::vector<NArrow::TReplaceKey> borders;
+ borders.reserve(numRows / rowsInGranule);
+ {
+ ui32 sumRows = 0;
+ for (auto& [ts, num] : uniqKeyCount) {
+ if (sumRows >= rowsInGranule) {
+ borders.emplace_back(ts);
+ sumRows = 0;
+ }
+ sumRows += num;
+ }
+ if (borders.empty()) {
+ borders.emplace_back(maxTs); // huge trailing key
+ }
+ Y_VERIFY(borders.size());
+ }
+
+ // Find offsets in source batches
+ std::vector<std::vector<int>> offsets(batches.size()); // vec[batch][border] = offset
+ for (size_t i = 0; i < batches.size(); ++i) {
+ const auto& batch = batches[i];
+ auto& batchOffsets = offsets[i];
+ batchOffsets.reserve(borders.size() + 1);
+
+ const auto effKey = TMarksGranules::GetEffectiveKey(batch, indexInfo);
+ Y_VERIFY(effKey->num_columns() && effKey->num_rows());
+
+ std::vector<NArrow::TRawReplaceKey> keys;
+ {
+ const auto& columns = effKey->columns();
+ keys.reserve(effKey->num_rows());
+ for (i64 i = 0; i < effKey->num_rows(); ++i) {
+ keys.emplace_back(NArrow::TRawReplaceKey(&columns, i));
+ }
+ }
+
+ batchOffsets.push_back(0);
+ for (const auto& border : borders) {
+ int offset = NArrow::LowerBound(keys, border, batchOffsets.back());
+ Y_VERIFY(offset >= batchOffsets.back());
+ Y_VERIFY(offset <= batch->num_rows());
+ batchOffsets.push_back(offset);
+ }
+
+ Y_VERIFY(batchOffsets.size() == borders.size() + 1);
+ }
+
+ // Make merge-sorted granule batch for each splitted granule
+ for (ui32 granuleNo = 0; granuleNo < borders.size() + 1; ++granuleNo) {
+ std::vector<std::shared_ptr<arrow::RecordBatch>> granuleBatches;
+ granuleBatches.reserve(batches.size());
+ const bool lastGranule = (granuleNo == borders.size());
+
+ // Extract granule: slice source batches with offsets
+ i64 granuleNumRows = 0;
+ for (size_t i = 0; i < batches.size(); ++i) {
+ const auto& batch = batches[i];
+ auto& batchOffsets = offsets[i];
+
+ int offset = batchOffsets[granuleNo];
+ int end = lastGranule ? batch->num_rows() : batchOffsets[granuleNo + 1];
+ int size = end - offset;
+ Y_VERIFY(size >= 0);
+
+ if (size) {
+ Y_VERIFY(offset < batch->num_rows());
+ auto slice = batch->Slice(offset, size);
+ Y_VERIFY(slice->num_rows());
+ granuleNumRows += slice->num_rows();
+#if 1 // Check correctness
+ const auto effKey = TMarksGranules::GetEffectiveKey(slice, indexInfo);
+ Y_VERIFY(effKey->num_columns() && effKey->num_rows());
+
+ auto startKey = granuleNo ? borders[granuleNo - 1] : minTs;
+ Y_VERIFY(NArrow::TReplaceKey::FromBatch(effKey, 0) >= startKey);
+
+ NArrow::TReplaceKey lastSliceKey = NArrow::TReplaceKey::FromBatch(effKey, effKey->num_rows() - 1);
+ if (granuleNo < borders.size() - 1) {
+ const auto& endKey = borders[granuleNo];
+ Y_VERIFY(lastSliceKey < endKey);
+ } else {
+ Y_VERIFY(lastSliceKey <= maxTs);
+ }
+#endif
+ Y_VERIFY_DEBUG(NArrow::IsSorted(slice, indexInfo.GetReplaceKey()));
+ granuleBatches.emplace_back(slice);
+ }
+ }
+
+ // Merge slices. We have to split a big key batches in several ones here.
+ if (granuleNumRows > 4 * rowsInGranule) {
+ granuleNumRows = rowsInGranule;
+ }
+ auto merged = NArrow::MergeSortedBatches(granuleBatches, indexInfo.SortReplaceDescription(), granuleNumRows);
+ for (auto& batch : merged) {
+ Y_VERIFY_DEBUG(NArrow::IsSortedAndUnique(batch, indexInfo.GetReplaceKey()));
+
+ auto startKey = ts0.GetBorder();
+ if (granuleNo) {
+ startKey = borders[granuleNo - 1];
+ }
+#if 1 // Check correctness
+ const auto effKey = TMarksGranules::GetEffectiveKey(batch, indexInfo);
+ Y_VERIFY(effKey->num_columns() && effKey->num_rows());
+
+ Y_VERIFY(NArrow::TReplaceKey::FromBatch(effKey, 0) >= startKey);
+#endif
+ out.emplace_back(TMark(startKey), batch);
+ }
+ }
+
+ return out;
+}
+
+ui64 TSplitCompactColumnEngineChanges::TryMovePortions(const TMark& ts0, std::vector<TPortionInfo>& portions,
+ std::vector<std::pair<TMark, ui64>>& tsIds, std::vector<std::pair<TPortionInfo, ui64>>& toMove, TConstructionContext& context)
+{
+ std::vector<TPortionInfo*> partitioned(portions.size());
+ // Split portions by putting the inserted portions in the original order
+ // at the beginning of the buffer and the compacted portions at the end.
+ // The compacted portions will be put in the reversed order, but it will be sorted later.
+ const auto [inserted, compacted] = [&]() {
+ size_t l = 0;
+ size_t r = portions.size();
+
+ for (auto& portionInfo : portions) {
+ partitioned[(portionInfo.IsInserted() ? l++ : --r)] = &portionInfo;
+ }
+
+ return std::make_tuple(std::span(partitioned.begin(), l), std::span(partitioned.begin() + l, partitioned.end()));
+ }();
+
+ context.Counters.AnalizeCompactedPortions->Add(compacted.size());
+ context.Counters.AnalizeInsertedPortions->Add(inserted.size());
+ for (auto&& i : portions) {
+ if (i.IsInserted()) {
+ context.Counters.AnalizeInsertedBytes->Add(i.BlobsBytes());
+ } else {
+ context.Counters.AnalizeCompactedBytes->Add(i.BlobsBytes());
+ }
+ }
+
+ // Do nothing if there are less than two compacted portions.
+ if (compacted.size() < 2) {
+ return 0;
+ }
+ // Order compacted portions by primary key.
+ std::sort(compacted.begin(), compacted.end(), [](const TPortionInfo* a, const TPortionInfo* b) {
+ return a->IndexKeyStart() < b->IndexKeyStart();
+ });
+ for (auto&& i : inserted) {
+ context.Counters.RepackedInsertedPortionBytes->Add(i->BlobsBytes());
+ }
+ context.Counters.RepackedInsertedPortions->Add(inserted.size());
+
+ // Check that there are no gaps between two adjacent portions in term of primary key range.
+ for (size_t i = 0; i < compacted.size() - 1; ++i) {
+ if (compacted[i]->IndexKeyEnd() >= compacted[i + 1]->IndexKeyStart()) {
+ for (auto&& c : compacted) {
+ context.Counters.SkipPortionBytesMoveThroughIntersection->Add(c->BlobsBytes());
+ }
+
+ context.Counters.SkipPortionsMoveThroughIntersection->Add(compacted.size());
+ context.Counters.RepackedCompactedPortions->Add(compacted.size());
+ return 0;
+ }
+ }
+
+ toMove.reserve(compacted.size());
+ ui64 numRows = 0;
+ ui32 counter = 0;
+ for (auto* portionInfo : compacted) {
+ ui32 rows = portionInfo->NumRows();
+ Y_VERIFY(rows);
+ numRows += rows;
+ context.Counters.MovedPortionBytes->Add(portionInfo->BlobsBytes());
+ tsIds.emplace_back((counter ? TMark(portionInfo->IndexKeyStart()) : ts0), counter + 1);
+ toMove.emplace_back(std::move(*portionInfo), counter);
+ ++counter;
+ // Ensure that std::move will take an effect.
+ static_assert(std::swappable<decltype(*portionInfo)>);
+ }
+ context.Counters.MovedPortions->Add(toMove.size());
+
+ std::vector<TPortionInfo> out;
+ out.reserve(inserted.size());
+ for (auto* portionInfo : inserted) {
+ out.emplace_back(std::move(*portionInfo));
+ // Ensure that std::move will take an effect.
+ static_assert(std::swappable<decltype(*portionInfo)>);
+ }
+ portions.swap(out);
+
+ return numRows;
+}
+
+}
diff --git a/ydb/core/tx/columnshard/engines/changes/split_compaction.h b/ydb/core/tx/columnshard/engines/changes/split_compaction.h
new file mode 100644
index 00000000000..aed71cf5630
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/changes/split_compaction.h
@@ -0,0 +1,27 @@
+#pragma once
+#include "compaction.h"
+
+namespace NKikimr::NOlap {
+
+class TSplitCompactColumnEngineChanges: public TCompactColumnEngineChanges {
+private:
+ using TBase = TCompactColumnEngineChanges;
+ std::pair<std::vector<std::shared_ptr<arrow::RecordBatch>>, TSnapshot> PortionsToBatches(const std::vector<TPortionInfo>& portions, const THashMap<TBlobRange, TString>& blobs,
+ const bool insertedOnly, TConstructionContext& context);
+
+ std::vector<std::pair<TMark, std::shared_ptr<arrow::RecordBatch>>> SliceGranuleBatches(const TIndexInfo& indexInfo,
+ const std::vector<std::shared_ptr<arrow::RecordBatch>>& batches,
+ const TMark& ts0);
+
+ ui64 TryMovePortions(const TMark& ts0,
+ std::vector<TPortionInfo>& portions,
+ std::vector<std::pair<TMark, ui64>>& tsIds,
+ std::vector<std::pair<TPortionInfo, ui64>>& toMove, TConstructionContext& context);
+
+protected:
+ virtual TConclusion<std::vector<TString>> DoConstructBlobs(TConstructionContext& context) noexcept override;
+public:
+ using TBase::TBase;
+};
+
+}
diff --git a/ydb/core/tx/columnshard/engines/changes/ttl.cpp b/ydb/core/tx/columnshard/engines/changes/ttl.cpp
index ed29f8091f6..dc5cc467c67 100644
--- a/ydb/core/tx/columnshard/engines/changes/ttl.cpp
+++ b/ydb/core/tx/columnshard/engines/changes/ttl.cpp
@@ -169,4 +169,78 @@ void TTTLColumnEngineChanges::DoOnFinish(NColumnShard::TColumnShard& self, TChan
self.BackgroundController.FinishTtl();
}
+bool TTTLColumnEngineChanges::UpdateEvictedPortion(TPortionInfo& portionInfo, TPortionEvictionFeatures& evictFeatures,
+ const THashMap<TBlobRange, TString>& srcBlobs, std::vector<TColumnRecord>& evictedRecords, std::vector<TString>& newBlobs,
+ TConstructionContext& context) const {
+ Y_VERIFY(portionInfo.TierName != evictFeatures.TargetTierName);
+
+ auto* tiering = context.GetTieringMap().FindPtr(evictFeatures.PathId);
+ Y_VERIFY(tiering);
+ auto compression = tiering->GetCompression(evictFeatures.TargetTierName);
+ if (!compression) {
+ // Noting to recompress. We have no other kinds of evictions yet.
+ portionInfo.TierName = evictFeatures.TargetTierName;
+ evictFeatures.DataChanges = false;
+ return true;
+ }
+
+ Y_VERIFY(!evictFeatures.NeedExport);
+
+ TPortionInfo undo = portionInfo;
+
+ auto blobSchema = context.SchemaVersions.GetSchema(undo.GetSnapshot());
+ auto resultSchema = context.SchemaVersions.GetLastSchema();
+ auto batch = portionInfo.AssembleInBatch(*blobSchema, *resultSchema, srcBlobs);
+
+ size_t undoSize = newBlobs.size();
+ TSaverContext saverContext;
+ saverContext.SetTierName(evictFeatures.TargetTierName).SetExternalCompression(compression);
+ for (auto& rec : portionInfo.Records) {
+ auto pos = resultSchema->GetFieldIndex(rec.ColumnId);
+ Y_VERIFY(pos >= 0);
+ auto field = resultSchema->GetFieldByIndex(pos);
+ auto columnSaver = resultSchema->GetColumnSaver(rec.ColumnId, saverContext);
+
+ auto blob = TPortionInfo::SerializeColumn(batch->GetColumnByName(field->name()), field, columnSaver);
+ if (blob.size() >= TPortionInfo::BLOB_BYTES_LIMIT) {
+ portionInfo = undo;
+ newBlobs.resize(undoSize);
+ return false;
+ }
+ newBlobs.emplace_back(std::move(blob));
+ rec.BlobRange = TBlobRange{};
+ }
+
+ for (auto& rec : undo.Records) {
+ evictedRecords.emplace_back(std::move(rec));
+ }
+
+ portionInfo.AddMetadata(*resultSchema, batch, evictFeatures.TargetTierName);
+ return true;
+}
+
+NKikimr::TConclusion<std::vector<TString>> TTTLColumnEngineChanges::DoConstructBlobs(TConstructionContext& context) noexcept {
+ Y_VERIFY(!Blobs.empty()); // src data
+ Y_VERIFY(!PortionsToEvict.empty()); // src meta
+ Y_VERIFY(EvictedRecords.empty()); // dst meta
+
+ std::vector<TString> newBlobs;
+ std::vector<std::pair<TPortionInfo, TPortionEvictionFeatures>> evicted;
+ evicted.reserve(PortionsToEvict.size());
+
+ for (auto& [portionInfo, evictFeatures] : PortionsToEvict) {
+ Y_VERIFY(!portionInfo.Empty());
+ Y_VERIFY(portionInfo.IsActive());
+
+ if (UpdateEvictedPortion(portionInfo, evictFeatures, Blobs,
+ EvictedRecords, newBlobs, context)) {
+ Y_VERIFY(portionInfo.TierName == evictFeatures.TargetTierName);
+ evicted.emplace_back(std::move(portionInfo), evictFeatures);
+ }
+ }
+
+ PortionsToEvict.swap(evicted);
+ return newBlobs;
+}
+
}
diff --git a/ydb/core/tx/columnshard/engines/changes/ttl.h b/ydb/core/tx/columnshard/engines/changes/ttl.h
index ba740c08bf4..3562e9494ba 100644
--- a/ydb/core/tx/columnshard/engines/changes/ttl.h
+++ b/ydb/core/tx/columnshard/engines/changes/ttl.h
@@ -9,6 +9,10 @@ private:
using TBase = TCleanupColumnEngineChanges;
THashMap<TString, TPathIdBlobs> ExportTierBlobs;
ui64 ExportNo = 0;
+ bool UpdateEvictedPortion(TPortionInfo& portionInfo,
+ TPortionEvictionFeatures& evictFeatures, const THashMap<TBlobRange, TString>& srcBlobs,
+ std::vector<TColumnRecord>& evictedRecords, std::vector<TString>& newBlobs, TConstructionContext& context) const;
+
protected:
virtual void DoWriteIndexComplete(NColumnShard::TColumnShard& self, TWriteIndexCompleteContext& context) override;
virtual void DoCompile(TFinalizationContext& context) override;
@@ -17,6 +21,10 @@ protected:
virtual bool DoApplyChanges(TColumnEngineForLogs& self, TApplyChangesContext& context, const bool dryRun) override;
virtual void DoDebugString(TStringOutput& out) const override;
virtual void DoWriteIndex(NColumnShard::TColumnShard& self, TWriteIndexContext& context) override;
+ virtual TConclusion<std::vector<TString>> DoConstructBlobs(TConstructionContext& context) noexcept override;
+ virtual bool NeedConstruction() const override {
+ return PortionsToEvict.size();
+ }
public:
std::vector<TColumnRecord> EvictedRecords;
std::vector<std::pair<TPortionInfo, TPortionEvictionFeatures>> PortionsToEvict; // {portion, TPortionEvictionFeatures}
diff --git a/ydb/core/tx/columnshard/engines/changes/with_appended.cpp b/ydb/core/tx/columnshard/engines/changes/with_appended.cpp
index 27e7738362c..ac01441408a 100644
--- a/ydb/core/tx/columnshard/engines/changes/with_appended.cpp
+++ b/ydb/core/tx/columnshard/engines/changes/with_appended.cpp
@@ -2,6 +2,7 @@
#include <ydb/core/tx/columnshard/blob_cache.h>
#include <ydb/core/tx/columnshard/columnshard_impl.h>
#include <ydb/core/tx/columnshard/engines/column_engine_logs.h>
+#include <ydb/core/tx/columnshard/splitter/splitter.h>
namespace NKikimr::NOlap {
@@ -101,4 +102,47 @@ void TChangesWithAppend::DoCompile(TFinalizationContext& context) {
}
}
+std::vector<NKikimr::NOlap::TPortionInfo> TChangesWithAppend::MakeAppendedPortions(
+ const ui64 pathId, const std::shared_ptr<arrow::RecordBatch> batch, const ui64 granule, const TSnapshot& snapshot,
+ std::vector<TString>& blobs, const TGranuleMeta* granuleMeta, TConstructionContext& context) const {
+ Y_VERIFY(batch->num_rows());
+
+ auto resultSchema = context.SchemaVersions.GetSchema(snapshot);
+ std::vector<TPortionInfo> out;
+
+ TString tierName;
+ std::optional<NArrow::TCompression> compression;
+ if (pathId) {
+ if (auto* tiering = context.GetTieringMap().FindPtr(pathId)) {
+ tierName = tiering->GetHottestTierName();
+ if (const auto& tierCompression = tiering->GetCompression(tierName)) {
+ compression = *tierCompression;
+ }
+ }
+ }
+ TSaverContext saverContext;
+ saverContext.SetTierName(tierName).SetExternalCompression(compression);
+
+ TSplitLimiter limiter(granuleMeta, context.Counters, resultSchema, batch);
+
+ std::vector<TString> portionBlobs;
+ std::shared_ptr<arrow::RecordBatch> portionBatch;
+ while (limiter.Next(portionBlobs, portionBatch, saverContext)) {
+ TPortionInfo portionInfo;
+ portionInfo.Records.reserve(resultSchema->GetSchema()->num_fields());
+ for (auto&& f : resultSchema->GetSchema()->fields()) {
+ const ui32 columnId = resultSchema->GetIndexInfo().GetColumnId(f->name());
+ TColumnRecord record = TColumnRecord::Make(granule, columnId, snapshot, 0);
+ portionInfo.AppendOneChunkColumn(std::move(record));
+ }
+ for (auto&& i : portionBlobs) {
+ blobs.emplace_back(i);
+ }
+ portionInfo.AddMetadata(*resultSchema, portionBatch, tierName);
+ out.emplace_back(std::move(portionInfo));
+ }
+
+ return out;
+}
+
}
diff --git a/ydb/core/tx/columnshard/engines/changes/with_appended.h b/ydb/core/tx/columnshard/engines/changes/with_appended.h
index 1a1b181a392..f258f9a834e 100644
--- a/ydb/core/tx/columnshard/engines/changes/with_appended.h
+++ b/ydb/core/tx/columnshard/engines/changes/with_appended.h
@@ -10,6 +10,12 @@ protected:
virtual void DoCompile(TFinalizationContext& context) override;
virtual bool DoApplyChanges(TColumnEngineForLogs& self, TApplyChangesContext& context, const bool dryRun) override;
virtual void DoWriteIndex(NColumnShard::TColumnShard& self, TWriteIndexContext& context) override;
+ std::vector<TPortionInfo> MakeAppendedPortions(const ui64 pathId,
+ const std::shared_ptr<arrow::RecordBatch> batch,
+ const ui64 granule,
+ const TSnapshot& snapshot,
+ std::vector<TString>& blobs, const TGranuleMeta* granuleMeta, TConstructionContext& context) const;
+
public:
std::vector<TPortionInfo> AppendedPortions; // New portions after indexing or compaction
THashMap<ui64, std::pair<ui64, TMark>> NewGranules; // granule -> {pathId, key}
diff --git a/ydb/core/tx/columnshard/engines/changes/ya.make b/ydb/core/tx/columnshard/engines/changes/ya.make
index 462b1c11518..dced5b30854 100644
--- a/ydb/core/tx/columnshard/engines/changes/ya.make
+++ b/ydb/core/tx/columnshard/engines/changes/ya.make
@@ -4,6 +4,8 @@ SRCS(
abstract.cpp
mark.cpp
compaction.cpp
+ in_granule_compaction.cpp
+ split_compaction.cpp
ttl.cpp
indexation.cpp
cleanup.cpp
@@ -16,6 +18,7 @@ PEERDIR(
ydb/core/formats/arrow
ydb/core/tx/columnshard/common
ydb/core/tx/columnshard/engines/insert_table
+ ydb/core/tx/columnshard/splitter
ydb/core/tablet_flat
ydb/core/tx/tiering
ydb/core/protos
diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
index 5d2336daf95..9175a753612 100644
--- a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
+++ b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
@@ -1,12 +1,16 @@
#include "column_engine_logs.h"
#include "filter.h"
-#include "index_logic_logs.h"
#include "indexed_read_data.h"
#include <ydb/core/tx/columnshard/hooks/abstract/abstract.h>
#include <ydb/core/formats/arrow/one_batch_input_stream.h>
#include <ydb/core/formats/arrow/merging_sorted_input_stream.h>
#include <ydb/library/conclusion/status.h>
+#include "changes/indexation.h"
+#include "changes/in_granule_compaction.h"
+#include "changes/split_compaction.h"
+#include "changes/cleanup.h"
+#include "changes/ttl.h"
#include <concepts>
@@ -122,6 +126,35 @@ TConclusionStatus InitInGranuleMerge(const TMark& granuleMark, std::vector<TPort
} // namespace
+std::shared_ptr<NKikimr::NOlap::TCompactColumnEngineChanges> TColumnEngineForLogs::TChangesConstructor::BuildCompactionChanges(std::unique_ptr<TCompactionInfo>&& info,
+ const TCompactionLimits& limits, const TSnapshot& initSnapshot) {
+ std::shared_ptr<TCompactColumnEngineChanges> result;
+ if (info->InGranule()) {
+ result = std::make_shared<TInGranuleCompactColumnEngineChanges>(limits, std::move(info));
+ } else {
+ result = std::make_shared<TSplitCompactColumnEngineChanges>(limits, std::move(info));
+ }
+ result->InitSnapshot = initSnapshot;
+ return result;
+}
+
+std::shared_ptr<NKikimr::NOlap::TCleanupColumnEngineChanges> TColumnEngineForLogs::TChangesConstructor::BuildCleanupChanges(const TSnapshot& initSnapshot) {
+ auto changes = std::make_shared<TCleanupColumnEngineChanges>();
+ changes->InitSnapshot = initSnapshot;
+ return changes;
+}
+
+std::shared_ptr<NKikimr::NOlap::TTTLColumnEngineChanges> TColumnEngineForLogs::TChangesConstructor::BuildTtlChanges() {
+ return std::make_shared<TTTLColumnEngineChanges>();
+}
+
+std::shared_ptr<NKikimr::NOlap::TInsertColumnEngineChanges> TColumnEngineForLogs::TChangesConstructor::BuildInsertChanges(const TMark& defaultMark, std::vector<NOlap::TInsertedData>&& blobsToIndex, const TSnapshot& initSnapshot) {
+ auto changes = std::make_shared<TInsertColumnEngineChanges>(defaultMark);
+ changes->DataToIndex = std::move(blobsToIndex);
+ changes->InitSnapshot = initSnapshot;
+ return changes;
+}
+
TColumnEngineForLogs::TColumnEngineForLogs(ui64 tabletId, const TCompactionLimits& limits)
: GranulesStorage(std::make_shared<TGranulesStorage>(SignalCounters, limits))
, TabletId(tabletId)
diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.h b/ydb/core/tx/columnshard/engines/column_engine_logs.h
index e1fa5a799bd..54192815f65 100644
--- a/ydb/core/tx/columnshard/engines/column_engine_logs.h
+++ b/ydb/core/tx/columnshard/engines/column_engine_logs.h
@@ -6,10 +6,6 @@
#include <ydb/core/tx/columnshard/counters/engine_logs.h>
#include "storage/granule.h"
#include "storage/storage.h"
-#include "changes/indexation.h"
-#include "changes/compaction.h"
-#include "changes/cleanup.h"
-#include "changes/ttl.h"
namespace NKikimr::NArrow {
struct TSortDescription;
@@ -40,29 +36,14 @@ public:
class TChangesConstructor : public TColumnEngineChanges {
public:
static std::shared_ptr<TInsertColumnEngineChanges> BuildInsertChanges(const TMark& defaultMark,
- std::vector<NOlap::TInsertedData>&& blobsToIndex, const TSnapshot& initSnapshot) {
- auto changes = std::make_shared<TInsertColumnEngineChanges>(defaultMark);
- changes->DataToIndex = std::move(blobsToIndex);
- changes->InitSnapshot = initSnapshot;
- return changes;
- }
+ std::vector<NOlap::TInsertedData>&& blobsToIndex, const TSnapshot& initSnapshot);
static std::shared_ptr<TCompactColumnEngineChanges> BuildCompactionChanges(std::unique_ptr<TCompactionInfo>&& info,
- const TCompactionLimits& limits, const TSnapshot& initSnapshot) {
- auto changes = std::make_shared<TCompactColumnEngineChanges>(limits, std::move(info));
- changes->InitSnapshot = initSnapshot;
- return changes;
- }
+ const TCompactionLimits& limits, const TSnapshot& initSnapshot);
- static std::shared_ptr<TCleanupColumnEngineChanges> BuildCleanupChanges(const TSnapshot& initSnapshot) {
- auto changes = std::make_shared<TCleanupColumnEngineChanges>();
- changes->InitSnapshot = initSnapshot;
- return changes;
- }
+ static std::shared_ptr<TCleanupColumnEngineChanges> BuildCleanupChanges(const TSnapshot& initSnapshot);
- static std::shared_ptr<TTTLColumnEngineChanges> BuildTtlChanges() {
- return std::make_shared<TTTLColumnEngineChanges>();
- }
+ static std::shared_ptr<TTTLColumnEngineChanges> BuildTtlChanges();
};
enum ETableIdx {
diff --git a/ydb/core/tx/columnshard/engines/index_logic_logs.cpp b/ydb/core/tx/columnshard/engines/index_logic_logs.cpp
deleted file mode 100644
index ab3eb26d9b3..00000000000
--- a/ydb/core/tx/columnshard/engines/index_logic_logs.cpp
+++ /dev/null
@@ -1,803 +0,0 @@
-#include "index_logic_logs.h"
-
-#include <span>
-
-namespace NKikimr::NOlap {
-
-std::shared_ptr<arrow::RecordBatch> TIndexationLogic::AddSpecials(const std::shared_ptr<arrow::RecordBatch>& srcBatch,
- const TIndexInfo& indexInfo, const TInsertedData& inserted) const {
- auto batch = TIndexInfo::AddSpecialColumns(srcBatch, inserted.GetSnapshot());
- Y_VERIFY(batch);
-
- return NArrow::ExtractColumns(batch, indexInfo.ArrowSchemaWithSpecials());
-}
-
-bool TEvictionLogic::UpdateEvictedPortion(TPortionInfo& portionInfo,
- TPortionEvictionFeatures& evictFeatures, const THashMap<TBlobRange, TString>& srcBlobs,
- std::vector<TColumnRecord>& evictedRecords, std::vector<TString>& newBlobs) const {
- Y_VERIFY(portionInfo.TierName != evictFeatures.TargetTierName);
-
- auto* tiering = GetTieringMap().FindPtr(evictFeatures.PathId);
- Y_VERIFY(tiering);
- auto compression = tiering->GetCompression(evictFeatures.TargetTierName);
- if (!compression) {
- // Noting to recompress. We have no other kinds of evictions yet.
- portionInfo.TierName = evictFeatures.TargetTierName;
- evictFeatures.DataChanges = false;
- return true;
- }
-
- Y_VERIFY(!evictFeatures.NeedExport);
-
- TPortionInfo undo = portionInfo;
-
- auto blobSchema = SchemaVersions.GetSchema(undo.GetSnapshot());
- auto resultSchema = SchemaVersions.GetLastSchema();
- auto batch = portionInfo.AssembleInBatch(*blobSchema, *resultSchema, srcBlobs);
-
- size_t undoSize = newBlobs.size();
- TSaverContext saverContext;
- saverContext.SetTierName(evictFeatures.TargetTierName).SetExternalCompression(compression);
- for (auto& rec : portionInfo.Records) {
- auto pos = resultSchema->GetFieldIndex(rec.ColumnId);
- Y_VERIFY(pos >= 0);
- auto field = resultSchema->GetFieldByIndex(pos);
- auto columnSaver = resultSchema->GetColumnSaver(rec.ColumnId, saverContext);
-
- auto blob = TPortionInfo::SerializeColumn(batch->GetColumnByName(field->name()), field, columnSaver);
- if (blob.size() >= TPortionInfo::BLOB_BYTES_LIMIT) {
- portionInfo = undo;
- newBlobs.resize(undoSize);
- return false;
- }
- newBlobs.emplace_back(std::move(blob));
- rec.BlobRange = TBlobRange{};
- }
-
- for (auto& rec : undo.Records) {
- evictedRecords.emplace_back(std::move(rec));
- }
-
- portionInfo.AddMetadata(*resultSchema, batch, evictFeatures.TargetTierName);
- return true;
-}
-
-class TSplitLimiter {
-private:
- static const inline double ReduceCorrectionKff = 0.9;
- static const inline double IncreaseCorrectionKff = 1.1;
- static const inline ui64 ExpectedBlobSize = 6 * 1024 * 1024;
- static const inline ui64 MinBlobSize = 1 * 1024 * 1024;
-
- const NColumnShard::TIndexationCounters Counters;
- ui32 BaseStepRecordsCount = 0;
- ui32 CurrentStepRecordsCount = 0;
- std::shared_ptr<arrow::RecordBatch> Batch;
- std::vector<TColumnSummary> SortedColumnIds;
- ui32 Position = 0;
- ISnapshotSchema::TPtr Schema;
-public:
- TSplitLimiter(const TGranuleMeta* granuleMeta, const NColumnShard::TIndexationCounters& counters,
- ISnapshotSchema::TPtr schema, const std::shared_ptr<arrow::RecordBatch> batch)
- : Counters(counters)
- , Batch(batch)
- , Schema(schema)
- {
- if (granuleMeta && granuleMeta->GetAdditiveSummary().GetOther().GetRecordsCount()) {
- Y_VERIFY(granuleMeta->GetHardSummary().GetColumnIdsSortedBySizeDescending().size());
- SortedColumnIds = granuleMeta->GetHardSummary().GetColumnIdsSortedBySizeDescending();
- const auto biggestColumn = SortedColumnIds.front();
- Y_VERIFY(biggestColumn.GetPackedBlobsSize());
- const double expectedPackedRecordSize = 1.0 * biggestColumn.GetPackedBlobsSize() / granuleMeta->GetAdditiveSummary().GetOther().GetRecordsCount();
- BaseStepRecordsCount = ExpectedBlobSize / expectedPackedRecordSize;
- for (ui32 i = 1; i < SortedColumnIds.size(); ++i) {
- Y_VERIFY(SortedColumnIds[i - 1].GetPackedBlobsSize() >= SortedColumnIds[i].GetPackedBlobsSize());
- }
- if (BaseStepRecordsCount > batch->num_rows()) {
- BaseStepRecordsCount = batch->num_rows();
- } else {
- BaseStepRecordsCount = batch->num_rows() / (ui32)(batch->num_rows() / BaseStepRecordsCount);
- if (BaseStepRecordsCount * expectedPackedRecordSize > TCompactionLimits::MAX_BLOB_SIZE) {
- BaseStepRecordsCount = ExpectedBlobSize / expectedPackedRecordSize;
- }
- }
- } else {
- for (auto&& i : Schema->GetIndexInfo().GetColumnIds()) {
- SortedColumnIds.emplace_back(TColumnSummary(i));
- }
- BaseStepRecordsCount = batch->num_rows();
- }
- BaseStepRecordsCount = std::min<ui32>(BaseStepRecordsCount, Batch->num_rows());
- Y_VERIFY(BaseStepRecordsCount);
- CurrentStepRecordsCount = BaseStepRecordsCount;
- }
-
- bool Next(std::vector<TString>& portionBlobs, std::shared_ptr<arrow::RecordBatch>& batch, const TSaverContext& saverContext) {
- if (Position == Batch->num_rows()) {
- return false;
- }
-
- portionBlobs.resize(Schema->GetSchema()->num_fields());
- while (true) {
- Y_VERIFY(Position < Batch->num_rows());
- std::shared_ptr<arrow::RecordBatch> currentBatch;
- if (Batch->num_rows() - Position < CurrentStepRecordsCount * 1.1) {
- currentBatch = Batch->Slice(Position, Batch->num_rows() - Position);
- } else {
- currentBatch = Batch->Slice(Position, CurrentStepRecordsCount);
- }
-
- ui32 fillCounter = 0;
- for (const auto& columnSummary : SortedColumnIds) {
- const TString& columnName = Schema->GetIndexInfo().GetColumnName(columnSummary.GetColumnId());
- const int idx = Schema->GetFieldIndex(columnSummary.GetColumnId());
- Y_VERIFY(idx >= 0);
- auto field = Schema->GetFieldByIndex(idx);
- Y_VERIFY(field);
- auto array = currentBatch->GetColumnByName(columnName);
- Y_VERIFY(array);
- auto columnSaver = Schema->GetColumnSaver(columnSummary.GetColumnId(), saverContext);
- TString blob = TPortionInfo::SerializeColumn(array, field, columnSaver);
- if (blob.size() >= TCompactionLimits::MAX_BLOB_SIZE) {
- Counters.TrashDataSerializationBytes->Add(blob.size());
- Counters.TrashDataSerialization->Add(1);
- Counters.TrashDataSerializationHistogramBytes->Collect(blob.size());
- const double kffNew = 1.0 * ExpectedBlobSize / blob.size() * ReduceCorrectionKff;
- CurrentStepRecordsCount = currentBatch->num_rows() * kffNew;
- Y_VERIFY(CurrentStepRecordsCount);
- break;
- } else {
- Counters.CorrectDataSerializationBytes->Add(blob.size());
- Counters.CorrectDataSerialization->Add(1);
- }
-
- portionBlobs[idx] = std::move(blob);
- ++fillCounter;
- }
-
- if (fillCounter == portionBlobs.size()) {
- Y_VERIFY(fillCounter == portionBlobs.size());
- Position += currentBatch->num_rows();
- Y_VERIFY(Position <= Batch->num_rows());
- ui64 maxBlobSize = 0;
- for (auto&& i : portionBlobs) {
- Counters.SplittedPortionColumnSize->Collect(i.size());
- maxBlobSize = std::max<ui64>(maxBlobSize, i.size());
- }
- batch = currentBatch;
- if (maxBlobSize < MinBlobSize) {
- if ((Position != currentBatch->num_rows() || Position != Batch->num_rows())) {
- Counters.SplittedPortionLargestColumnSize->Collect(maxBlobSize);
- Counters.TooSmallBlob->Add(1);
- if (Position == Batch->num_rows()) {
- Counters.TooSmallBlobFinish->Add(1);
- }
- if (Position == currentBatch->num_rows()) {
- Counters.TooSmallBlobStart->Add(1);
- }
- } else {
- Counters.SimpleSplitPortionLargestColumnSize->Collect(maxBlobSize);
- }
- CurrentStepRecordsCount = currentBatch->num_rows() * IncreaseCorrectionKff;
- } else {
- Counters.SplittedPortionLargestColumnSize->Collect(maxBlobSize);
- }
- return true;
- }
- }
- }
-};
-
-std::vector<TPortionInfo> TIndexLogicBase::MakeAppendedPortions(const ui64 pathId,
- const std::shared_ptr<arrow::RecordBatch> batch,
- const ui64 granule,
- const TSnapshot& snapshot,
- std::vector<TString>& blobs, const TGranuleMeta* granuleMeta) const {
- Y_VERIFY(batch->num_rows());
-
- auto resultSchema = SchemaVersions.GetSchema(snapshot);
- std::vector<TPortionInfo> out;
-
- TString tierName;
- std::optional<NArrow::TCompression> compression;
- if (pathId) {
- if (auto* tiering = GetTieringMap().FindPtr(pathId)) {
- tierName = tiering->GetHottestTierName();
- if (const auto& tierCompression = tiering->GetCompression(tierName)) {
- compression = *tierCompression;
- }
- }
- }
- TSaverContext saverContext;
- saverContext.SetTierName(tierName).SetExternalCompression(compression);
-
- TSplitLimiter limiter(granuleMeta, Counters, resultSchema, batch);
-
- std::vector<TString> portionBlobs;
- std::shared_ptr<arrow::RecordBatch> portionBatch;
- while (limiter.Next(portionBlobs, portionBatch, saverContext)) {
- TPortionInfo portionInfo;
- portionInfo.Records.reserve(resultSchema->GetSchema()->num_fields());
- for (auto&& f : resultSchema->GetSchema()->fields()) {
- const ui32 columnId = resultSchema->GetIndexInfo().GetColumnId(f->name());
- TColumnRecord record = TColumnRecord::Make(granule, columnId, snapshot, 0);
- portionInfo.AppendOneChunkColumn(std::move(record));
- }
- for (auto&& i : portionBlobs) {
- blobs.emplace_back(i);
- }
- portionInfo.AddMetadata(*resultSchema, portionBatch, tierName);
- out.emplace_back(std::move(portionInfo));
- }
-
- return out;
-}
-
-std::pair<std::vector<std::shared_ptr<arrow::RecordBatch>>, TSnapshot>
-TCompactionLogic::PortionsToBatches(const std::vector<TPortionInfo>& portions, const THashMap<TBlobRange, TString>& blobs,
- const bool insertedOnly) const {
- std::vector<std::shared_ptr<arrow::RecordBatch>> batches;
- batches.reserve(portions.size());
- const auto resultSchema = SchemaVersions.GetLastSchema();
- TSnapshot maxSnapshot = resultSchema->GetSnapshot();
- for (const auto& portionInfo : portions) {
- if (!insertedOnly || portionInfo.IsInserted()) {
- const auto blobSchema = SchemaVersions.GetSchema(portionInfo.GetSnapshot());
-
- batches.push_back(portionInfo.AssembleInBatch(*blobSchema, *resultSchema, blobs));
-
- if (maxSnapshot < portionInfo.GetSnapshot()) {
- maxSnapshot = portionInfo.GetSnapshot();
- }
- }
- }
- return std::make_pair(std::move(batches), maxSnapshot);
-}
-
-TConclusion<std::vector<TString>> TIndexationLogic::DoApply(std::shared_ptr<TColumnEngineChanges> indexChanges) const noexcept {
- auto changes = std::dynamic_pointer_cast<TInsertColumnEngineChanges>(indexChanges);
- Y_VERIFY(changes);
- Y_VERIFY(!changes->DataToIndex.empty());
- Y_VERIFY(changes->AppendedPortions.empty());
-
- auto maxSnapshot = TSnapshot::Zero();
- for (auto& inserted : changes->DataToIndex) {
- TSnapshot insertSnap = inserted.GetSnapshot();
- Y_VERIFY(insertSnap.Valid());
- if (insertSnap > maxSnapshot) {
- maxSnapshot = insertSnap;
- }
- }
- Y_VERIFY(maxSnapshot.Valid());
-
- auto resultSchema = SchemaVersions.GetSchema(maxSnapshot);
- Y_VERIFY(resultSchema->GetIndexInfo().IsSorted());
-
- THashMap<ui64, std::vector<std::shared_ptr<arrow::RecordBatch>>> pathBatches;
- for (auto& inserted : changes->DataToIndex) {
- TBlobRange blobRange(inserted.BlobId, 0, inserted.BlobId.BlobSize());
-
- auto blobSchema = SchemaVersions.GetSchema(inserted.GetSchemaSnapshot());
- auto& indexInfo = blobSchema->GetIndexInfo();
- Y_VERIFY(indexInfo.IsSorted());
-
- std::shared_ptr<arrow::RecordBatch> batch;
- if (auto it = changes->CachedBlobs.find(inserted.BlobId); it != changes->CachedBlobs.end()) {
- batch = it->second;
- } else if (auto* blobData = changes->Blobs.FindPtr(blobRange)) {
- Y_VERIFY(!blobData->empty(), "Blob data not present");
- // Prepare batch
- batch = NArrow::DeserializeBatch(*blobData, indexInfo.ArrowSchema());
- if (!batch) {
- AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)
- ("event", "cannot_parse")
- ("data_snapshot", TStringBuilder() << inserted.GetSnapshot())
- ("index_snapshot", TStringBuilder() << blobSchema->GetSnapshot());
- }
- } else {
- Y_VERIFY(blobData, "Data for range %s has not been read", blobRange.ToString().c_str());
- }
- Y_VERIFY(batch);
-
- batch = AddSpecials(batch, blobSchema->GetIndexInfo(), inserted);
- batch = resultSchema->NormalizeBatch(*blobSchema, batch);
- pathBatches[inserted.PathId].push_back(batch);
- Y_VERIFY_DEBUG(NArrow::IsSorted(pathBatches[inserted.PathId].back(), resultSchema->GetIndexInfo().GetReplaceKey()));
- }
- std::vector<TString> blobs;
-
- for (auto& [pathId, batches] : pathBatches) {
- changes->AddPathIfNotExists(pathId);
-
- // We could merge data here cause tablet limits indexing data portions
- auto merged = NArrow::CombineSortedBatches(batches, resultSchema->GetIndexInfo().SortReplaceDescription());
- Y_VERIFY(merged);
- Y_VERIFY_DEBUG(NArrow::IsSortedAndUnique(merged, resultSchema->GetIndexInfo().GetReplaceKey()));
-
- auto granuleBatches = TMarksGranules::SliceIntoGranules(merged, changes->PathToGranule[pathId], resultSchema->GetIndexInfo());
- for (auto& [granule, batch] : granuleBatches) {
- auto portions = MakeAppendedPortions(pathId, batch, granule, maxSnapshot, blobs, changes->GetGranuleMeta());
- Y_VERIFY(portions.size() > 0);
- for (auto& portion : portions) {
- changes->AppendedPortions.emplace_back(std::move(portion));
- }
- }
- }
-
- Y_VERIFY(changes->PathToGranule.size() == pathBatches.size());
- return blobs;
-}
-
-std::pair<std::shared_ptr<arrow::RecordBatch>, TSnapshot> TCompactionLogic::CompactInOneGranule(ui64 granule,
- const std::vector<TPortionInfo>& portions,
- const THashMap<TBlobRange, TString>& blobs) const {
- std::vector<std::shared_ptr<arrow::RecordBatch>> batches;
- batches.reserve(portions.size());
-
- auto resultSchema = SchemaVersions.GetLastSchema();
-
- TSnapshot maxSnapshot = resultSchema->GetSnapshot();
- for (auto& portionInfo : portions) {
- Y_VERIFY(!portionInfo.Empty());
- Y_VERIFY(portionInfo.Granule() == granule);
- auto blobSchema = SchemaVersions.GetSchema(portionInfo.GetSnapshot());
- auto batch = portionInfo.AssembleInBatch(*blobSchema, *resultSchema, blobs);
- batches.push_back(batch);
- if (portionInfo.GetSnapshot() > maxSnapshot) {
- maxSnapshot = portionInfo.GetSnapshot();
- }
- }
-
- auto sortedBatch = NArrow::CombineSortedBatches(batches, resultSchema->GetIndexInfo().SortReplaceDescription());
- Y_VERIFY_DEBUG(NArrow::IsSortedAndUnique(sortedBatch, resultSchema->GetIndexInfo().GetReplaceKey()));
-
- return std::make_pair(sortedBatch, maxSnapshot);
-}
-
-std::vector<TString> TCompactionLogic::CompactInGranule(std::shared_ptr<TCompactColumnEngineChanges> changes) const {
- const ui64 pathId = changes->SrcGranule->PathId;
- std::vector<TString> blobs;
- auto& switchedPortions = changes->SwitchedPortions;
- Y_VERIFY(switchedPortions.size());
-
- ui64 granule = switchedPortions[0].Granule();
- auto [batch, maxSnapshot] = CompactInOneGranule(granule, switchedPortions, changes->Blobs);
-
- auto resultSchema = SchemaVersions.GetLastSchema();
- std::vector<TPortionInfo> portions;
- if (!changes->MergeBorders.Empty()) {
- Y_VERIFY(changes->MergeBorders.GetOrderedMarks().size() > 1);
- auto slices = changes->MergeBorders.SliceIntoGranules(batch, resultSchema->GetIndexInfo());
- portions.reserve(slices.size());
-
- for (auto& [_, slice] : slices) {
- if (!slice || slice->num_rows() == 0) {
- continue;
- }
- auto tmp = MakeAppendedPortions(pathId, slice, granule, maxSnapshot, blobs, changes->GetGranuleMeta());
- for (auto&& portionInfo : tmp) {
- portions.emplace_back(std::move(portionInfo));
- }
- }
- } else {
- portions = MakeAppendedPortions(pathId, batch, granule, maxSnapshot, blobs, changes->GetGranuleMeta());
- }
-
- Y_VERIFY(portions.size() > 0);
- Y_VERIFY(changes->AppendedPortions.empty());
- // Set appended portions.
- changes->AppendedPortions.swap(portions);
-
- return blobs;
-}
-
-std::vector<std::pair<TMark, std::shared_ptr<arrow::RecordBatch>>>
-TCompactionLogic::SliceGranuleBatches(const TIndexInfo& indexInfo,
- const TCompactColumnEngineChanges& changes,
- const std::vector<std::shared_ptr<arrow::RecordBatch>>& batches,
- const TMark& ts0) const {
- std::vector<std::pair<TMark, std::shared_ptr<arrow::RecordBatch>>> out;
-
- // Extract unique effective keys and their counts
- i64 numRows = 0;
- TMap<NArrow::TReplaceKey, ui32> uniqKeyCount;
- for (const auto& batch : batches) {
- Y_VERIFY(batch);
- if (batch->num_rows() == 0) {
- continue;
- }
-
- numRows += batch->num_rows();
-
- const auto effKey = TMarksGranules::GetEffectiveKey(batch, indexInfo);
- Y_VERIFY(effKey->num_columns() && effKey->num_rows());
-
- auto effColumns = std::make_shared<NArrow::TArrayVec>(effKey->columns());
- for (int row = 0; row < effKey->num_rows(); ++row) {
- ++uniqKeyCount[NArrow::TReplaceKey(effColumns, row)];
- }
- }
-
- Y_VERIFY(uniqKeyCount.size());
- auto minTs = uniqKeyCount.begin()->first;
- auto maxTs = uniqKeyCount.rbegin()->first;
- Y_VERIFY(minTs >= ts0.GetBorder());
-
- // It's an estimation of needed count cause numRows calculated before key replaces
- ui32 rowsInGranule = numRows / changes.NumSplitInto(numRows);
- Y_VERIFY(rowsInGranule);
-
- // Cannot split in case of one unique key
- if (uniqKeyCount.size() == 1) {
- // We have to split big batch of same key in several portions
- auto merged = NArrow::MergeSortedBatches(batches, indexInfo.SortReplaceDescription(), rowsInGranule);
- for (auto& batch : merged) {
- Y_VERIFY_DEBUG(NArrow::IsSortedAndUnique(batch, indexInfo.GetReplaceKey()));
- out.emplace_back(ts0, batch);
- }
- return out;
- }
-
- // Make split borders from uniq keys
- std::vector<NArrow::TReplaceKey> borders;
- borders.reserve(numRows / rowsInGranule);
- {
- ui32 sumRows = 0;
- for (auto& [ts, num] : uniqKeyCount) {
- if (sumRows >= rowsInGranule) {
- borders.emplace_back(ts);
- sumRows = 0;
- }
- sumRows += num;
- }
- if (borders.empty()) {
- borders.emplace_back(maxTs); // huge trailing key
- }
- Y_VERIFY(borders.size());
- }
-
- // Find offsets in source batches
- std::vector<std::vector<int>> offsets(batches.size()); // vec[batch][border] = offset
- for (size_t i = 0; i < batches.size(); ++i) {
- const auto& batch = batches[i];
- auto& batchOffsets = offsets[i];
- batchOffsets.reserve(borders.size() + 1);
-
- const auto effKey = TMarksGranules::GetEffectiveKey(batch, indexInfo);
- Y_VERIFY(effKey->num_columns() && effKey->num_rows());
-
- std::vector<NArrow::TRawReplaceKey> keys;
- {
- const auto& columns = effKey->columns();
- keys.reserve(effKey->num_rows());
- for (i64 i = 0; i < effKey->num_rows(); ++i) {
- keys.emplace_back(NArrow::TRawReplaceKey(&columns, i));
- }
- }
-
- batchOffsets.push_back(0);
- for (const auto& border : borders) {
- int offset = NArrow::LowerBound(keys, border, batchOffsets.back());
- Y_VERIFY(offset >= batchOffsets.back());
- Y_VERIFY(offset <= batch->num_rows());
- batchOffsets.push_back(offset);
- }
-
- Y_VERIFY(batchOffsets.size() == borders.size() + 1);
- }
-
- // Make merge-sorted granule batch for each splitted granule
- for (ui32 granuleNo = 0; granuleNo < borders.size() + 1; ++granuleNo) {
- std::vector<std::shared_ptr<arrow::RecordBatch>> granuleBatches;
- granuleBatches.reserve(batches.size());
- const bool lastGranule = (granuleNo == borders.size());
-
- // Extract granule: slice source batches with offsets
- i64 granuleNumRows = 0;
- for (size_t i = 0; i < batches.size(); ++i) {
- const auto& batch = batches[i];
- auto& batchOffsets = offsets[i];
-
- int offset = batchOffsets[granuleNo];
- int end = lastGranule ? batch->num_rows() : batchOffsets[granuleNo + 1];
- int size = end - offset;
- Y_VERIFY(size >= 0);
-
- if (size) {
- Y_VERIFY(offset < batch->num_rows());
- auto slice = batch->Slice(offset, size);
- Y_VERIFY(slice->num_rows());
- granuleNumRows += slice->num_rows();
-#if 1 // Check correctness
- const auto effKey = TMarksGranules::GetEffectiveKey(slice, indexInfo);
- Y_VERIFY(effKey->num_columns() && effKey->num_rows());
-
- auto startKey = granuleNo ? borders[granuleNo - 1] : minTs;
- Y_VERIFY(NArrow::TReplaceKey::FromBatch(effKey, 0) >= startKey);
-
- NArrow::TReplaceKey lastSliceKey = NArrow::TReplaceKey::FromBatch(effKey, effKey->num_rows() - 1);
- if (granuleNo < borders.size() - 1) {
- const auto& endKey = borders[granuleNo];
- Y_VERIFY(lastSliceKey < endKey);
- } else {
- Y_VERIFY(lastSliceKey <= maxTs);
- }
-#endif
- Y_VERIFY_DEBUG(NArrow::IsSorted(slice, indexInfo.GetReplaceKey()));
- granuleBatches.emplace_back(slice);
- }
- }
-
- // Merge slices. We have to split a big key batches in several ones here.
- if (granuleNumRows > 4 * rowsInGranule) {
- granuleNumRows = rowsInGranule;
- }
- auto merged = NArrow::MergeSortedBatches(granuleBatches, indexInfo.SortReplaceDescription(), granuleNumRows);
- for (auto& batch : merged) {
- Y_VERIFY_DEBUG(NArrow::IsSortedAndUnique(batch, indexInfo.GetReplaceKey()));
-
- auto startKey = ts0.GetBorder();
- if (granuleNo) {
- startKey = borders[granuleNo - 1];
- }
-#if 1 // Check correctness
- const auto effKey = TMarksGranules::GetEffectiveKey(batch, indexInfo);
- Y_VERIFY(effKey->num_columns() && effKey->num_rows());
-
- Y_VERIFY(NArrow::TReplaceKey::FromBatch(effKey, 0) >= startKey);
-#endif
- out.emplace_back(TMark(startKey), batch);
- }
- }
-
- return out;
-}
-
-ui64 TCompactionLogic::TryMovePortions(const TMark& ts0,
- std::vector<TPortionInfo>& portions,
- std::vector<std::pair<TMark, ui64>>& tsIds,
- std::vector<std::pair<TPortionInfo, ui64>>& toMove) const {
- std::vector<TPortionInfo*> partitioned(portions.size());
- // Split portions by putting the inserted portions in the original order
- // at the beginning of the buffer and the compacted portions at the end.
- // The compacted portions will be put in the reversed order, but it will be sorted later.
- const auto [inserted, compacted] = [&]() {
- size_t l = 0;
- size_t r = portions.size();
-
- for (auto& portionInfo : portions) {
- partitioned[(portionInfo.IsInserted() ? l++ : --r)] = &portionInfo;
- }
-
- return std::make_tuple(std::span(partitioned.begin(), l), std::span(partitioned.begin() + l, partitioned.end()));
- }();
-
- Counters.AnalizeCompactedPortions->Add(compacted.size());
- Counters.AnalizeInsertedPortions->Add(inserted.size());
- for (auto&& i : portions) {
- if (i.IsInserted()) {
- Counters.AnalizeInsertedBytes->Add(i.BlobsBytes());
- } else {
- Counters.AnalizeCompactedBytes->Add(i.BlobsBytes());
- }
- }
-
- // Do nothing if there are less than two compacted portions.
- if (compacted.size() < 2) {
- return 0;
- }
- // Order compacted portions by primary key.
- std::sort(compacted.begin(), compacted.end(), [](const TPortionInfo* a, const TPortionInfo* b) {
- return a->IndexKeyStart() < b->IndexKeyStart();
- });
- for (auto&& i : inserted) {
- Counters.RepackedInsertedPortionBytes->Add(i->BlobsBytes());
- }
- Counters.RepackedInsertedPortions->Add(inserted.size());
-
- // Check that there are no gaps between two adjacent portions in term of primary key range.
- for (size_t i = 0; i < compacted.size() - 1; ++i) {
- if (compacted[i]->IndexKeyEnd() >= compacted[i + 1]->IndexKeyStart()) {
- for (auto&& c : compacted) {
- Counters.SkipPortionBytesMoveThroughIntersection->Add(c->BlobsBytes());
- }
-
- Counters.SkipPortionsMoveThroughIntersection->Add(compacted.size());
- Counters.RepackedCompactedPortions->Add(compacted.size());
- return 0;
- }
- }
-
- toMove.reserve(compacted.size());
- ui64 numRows = 0;
- ui32 counter = 0;
- for (auto* portionInfo : compacted) {
- ui32 rows = portionInfo->NumRows();
- Y_VERIFY(rows);
- numRows += rows;
- Counters.MovedPortionBytes->Add(portionInfo->BlobsBytes());
- tsIds.emplace_back((counter ? TMark(portionInfo->IndexKeyStart()) : ts0), counter + 1);
- toMove.emplace_back(std::move(*portionInfo), counter);
- ++counter;
- // Ensure that std::move will take an effect.
- static_assert(std::swappable<decltype(*portionInfo)>);
- }
- Counters.MovedPortions->Add(toMove.size());
-
- std::vector<TPortionInfo> out;
- out.reserve(inserted.size());
- for (auto* portionInfo : inserted) {
- out.emplace_back(std::move(*portionInfo));
- // Ensure that std::move will take an effect.
- static_assert(std::swappable<decltype(*portionInfo)>);
- }
- portions.swap(out);
-
- return numRows;
-}
-
-std::vector<TString> TCompactionLogic::CompactSplitGranule(const std::shared_ptr<TCompactColumnEngineChanges>& changes) const {
- const ui64 pathId = changes->SrcGranule->PathId;
- const TMark ts0 = changes->SrcGranule->Mark;
- std::vector<TPortionInfo>& portions = changes->SwitchedPortions;
-
- std::vector<std::pair<TMark, ui64>> tsIds;
- ui64 movedRows = TryMovePortions(ts0, portions, tsIds, changes->PortionsToMove);
- auto [srcBatches, maxSnapshot] = PortionsToBatches(portions, changes->Blobs, movedRows != 0);
- Y_VERIFY(srcBatches.size() == portions.size());
-
- std::vector<TString> blobs;
- auto resultSchema = SchemaVersions.GetLastSchema();
- if (movedRows) {
- Y_VERIFY(changes->PortionsToMove.size() >= 2);
- Y_VERIFY(changes->PortionsToMove.size() == tsIds.size());
- Y_VERIFY(tsIds.begin()->first == ts0);
-
- // Calculate total number of rows.
- ui64 numRows = movedRows;
- for (const auto& batch : srcBatches) {
- numRows += batch->num_rows();
- }
-
- // Recalculate new granules borders (if they are larger then portions)
- ui32 numSplitInto = changes->NumSplitInto(numRows);
- if (numSplitInto < tsIds.size()) {
- const ui32 rowsInGranule = numRows / numSplitInto;
- Y_VERIFY(rowsInGranule);
-
- std::vector<std::pair<TMark, ui64>> newTsIds;
- ui32 tmpGranule = 0;
- ui32 sumRows = 0;
- // Always insert mark of the source granule at the beginning.
- newTsIds.emplace_back(ts0, 1);
-
- for (size_t i = 0, end = tsIds.size(); i != end; ++i) {
- const TMark& ts = tsIds[i].first;
- // Make new granule if the current number of rows is exceeded the allowed number of rows in the granule
- // or there is the end of the ids and nothing was inserted so far.
- if (sumRows >= rowsInGranule || (i + 1 == end && newTsIds.size() == 1)) {
- ++tmpGranule;
- newTsIds.emplace_back(ts, tmpGranule + 1);
- sumRows = 0;
- }
-
- auto& toMove = changes->PortionsToMove[i];
- sumRows += toMove.first.NumRows();
- toMove.second = tmpGranule;
- }
-
- tsIds.swap(newTsIds);
- }
- Y_VERIFY(tsIds.size() > 1);
- Y_VERIFY(tsIds[0] == std::make_pair(ts0, ui64(1)));
- TMarksGranules marksGranules(std::move(tsIds));
-
- // Slice inserted portions with granules' borders
- THashMap<ui64, std::vector<std::shared_ptr<arrow::RecordBatch>>> idBatches;
- std::vector<TPortionInfo*> toSwitch;
- toSwitch.reserve(portions.size());
- for (size_t i = 0; i < portions.size(); ++i) {
- auto& portion = portions[i];
- auto& batch = srcBatches[i];
- auto slices = marksGranules.SliceIntoGranules(batch, resultSchema->GetIndexInfo());
-
- THashSet<ui64> ids;
- for (auto& [id, slice] : slices) {
- if (slice && slice->num_rows()) {
- ids.insert(id);
- idBatches[id].emplace_back(std::move(slice));
- }
- }
-
- // Optimization: move not splitted inserted portions. Do not reappend them.
- if (ids.size() == 1) {
- ui64 id = *ids.begin();
- idBatches[id].resize(idBatches[id].size() - 1);
- ui64 tmpGranule = id - 1;
- changes->PortionsToMove.emplace_back(std::move(portion), tmpGranule);
- } else {
- toSwitch.push_back(&portion);
- }
- }
-
- // Update switchedPortions if we have moves
- if (toSwitch.size() != portions.size()) {
- std::vector<TPortionInfo> tmp;
- tmp.reserve(toSwitch.size());
- for (auto* portionInfo : toSwitch) {
- tmp.emplace_back(std::move(*portionInfo));
- }
- portions.swap(tmp);
- }
-
- for (const auto& [mark, id] : marksGranules.GetOrderedMarks()) {
- ui64 tmpGranule = changes->SetTmpGranule(pathId, mark);
- for (const auto& batch : idBatches[id]) {
- // Cannot set snapshot here. It would be set in committing transaction in ApplyChanges().
- auto newPortions = MakeAppendedPortions(pathId, batch, tmpGranule, maxSnapshot, blobs, changes->GetGranuleMeta());
- Y_VERIFY(newPortions.size() > 0);
- for (auto& portion : newPortions) {
- changes->AppendedPortions.emplace_back(std::move(portion));
- }
- }
- }
- } else {
- auto batches = SliceGranuleBatches(resultSchema->GetIndexInfo(), *changes, srcBatches, ts0);
-
- changes->SetTmpGranule(pathId, ts0);
- for (auto& [ts, batch] : batches) {
- // Tmp granule would be updated to correct value in ApplyChanges()
- ui64 tmpGranule = changes->SetTmpGranule(pathId, ts);
-
- // Cannot set snapshot here. It would be set in committing transaction in ApplyChanges().
- auto portions = MakeAppendedPortions(pathId, batch, tmpGranule, maxSnapshot, blobs, changes->GetGranuleMeta());
- Y_VERIFY(portions.size() > 0);
- for (auto& portion : portions) {
- changes->AppendedPortions.emplace_back(std::move(portion));
- }
- }
- }
-
- return blobs;
-}
-
-TConclusion<std::vector<TString>> TCompactionLogic::DoApply(std::shared_ptr<TColumnEngineChanges> changes) const noexcept {
- auto castedChanges = std::dynamic_pointer_cast<TCompactColumnEngineChanges>(changes);
- Y_VERIFY(castedChanges);
- Y_VERIFY(!castedChanges->Blobs.empty()); // src data
- Y_VERIFY(!castedChanges->SwitchedPortions.empty()); // src meta
- Y_VERIFY(castedChanges->AppendedPortions.empty()); // dst meta
-
- if (castedChanges->CompactionInfo->InGranule()) {
- return CompactInGranule(castedChanges);
- } else {
- return CompactSplitGranule(castedChanges);
- }
-}
-
-TConclusion<std::vector<TString>> TEvictionLogic::DoApply(std::shared_ptr<TColumnEngineChanges> changesExt) const noexcept {
- auto changes = std::dynamic_pointer_cast<TTTLColumnEngineChanges>(changesExt);
- Y_VERIFY(changes);
- Y_VERIFY(!changes->Blobs.empty()); // src data
- Y_VERIFY(!changes->PortionsToEvict.empty()); // src meta
- Y_VERIFY(changes->EvictedRecords.empty()); // dst meta
-
- std::vector<TString> newBlobs;
- std::vector<std::pair<TPortionInfo, TPortionEvictionFeatures>> evicted;
- evicted.reserve(changes->PortionsToEvict.size());
-
- for (auto& [portionInfo, evictFeatures] : changes->PortionsToEvict) {
- Y_VERIFY(!portionInfo.Empty());
- Y_VERIFY(portionInfo.IsActive());
-
- if (UpdateEvictedPortion(portionInfo, evictFeatures, changes->Blobs,
- changes->EvictedRecords, newBlobs)) {
- Y_VERIFY(portionInfo.TierName == evictFeatures.TargetTierName);
- evicted.emplace_back(std::move(portionInfo), evictFeatures);
- }
- }
-
- changes->PortionsToEvict.swap(evicted);
- return newBlobs;
-}
-}
diff --git a/ydb/core/tx/columnshard/engines/index_logic_logs.h b/ydb/core/tx/columnshard/engines/index_logic_logs.h
deleted file mode 100644
index 3b13506a5da..00000000000
--- a/ydb/core/tx/columnshard/engines/index_logic_logs.h
+++ /dev/null
@@ -1,123 +0,0 @@
-#pragma once
-
-#include "defs.h"
-#include "portion_info.h"
-#include "column_engine_logs.h"
-#include <ydb/core/tx/columnshard/counters.h>
-
-namespace NKikimr::NOlap {
-
-class TIndexLogicBase {
-protected:
- const TVersionedIndex& SchemaVersions;
- const NColumnShard::TIndexationCounters Counters;
- virtual TConclusion<std::vector<TString>> DoApply(std::shared_ptr<TColumnEngineChanges> indexChanges) const noexcept = 0;
-private:
- const THashMap<ui64, NKikimr::NOlap::TTiering>* TieringMap = nullptr;
-public:
- TIndexLogicBase(const TVersionedIndex& indexInfo, const THashMap<ui64, NKikimr::NOlap::TTiering>& tieringMap,
- const NColumnShard::TIndexationCounters& counters)
- : SchemaVersions(indexInfo)
- , Counters(counters)
- , TieringMap(&tieringMap)
- {
- }
-
- TIndexLogicBase(const TVersionedIndex& indexInfo, const NColumnShard::TIndexationCounters& counters)
- : SchemaVersions(indexInfo)
- , Counters(counters)
- {
- }
-
- virtual ~TIndexLogicBase() {
- }
- TConclusion<std::vector<TString>> Apply(std::shared_ptr<TColumnEngineChanges> indexChanges) const noexcept {
- {
- ui64 readBytes = 0;
- for (auto&& i : indexChanges->Blobs) {
- readBytes += i.first.Size;
- }
- Counters.CompactionInputSize(readBytes);
- }
- const TMonotonic start = TMonotonic::Now();
- TConclusion<std::vector<TString>> result = DoApply(indexChanges);
- if (result.IsSuccess()) {
- Counters.CompactionDuration->Collect((TMonotonic::Now() - start).MilliSeconds());
- } else {
- Counters.CompactionFails->Add(1);
- }
- return result;
- }
-
-protected:
- std::vector<TPortionInfo> MakeAppendedPortions(const ui64 pathId,
- const std::shared_ptr<arrow::RecordBatch> batch,
- const ui64 granule,
- const TSnapshot& minSnapshot,
- std::vector<TString>& blobs, const TGranuleMeta* granuleMeta) const;
-
- const THashMap<ui64, NKikimr::NOlap::TTiering>& GetTieringMap() const {
- if (TieringMap) {
- return *TieringMap;
- }
- return Default<THashMap<ui64, NKikimr::NOlap::TTiering>>();
- }
-};
-
-class TIndexationLogic: public TIndexLogicBase {
-public:
- using TIndexLogicBase::TIndexLogicBase;
-protected:
- virtual TConclusion<std::vector<TString>> DoApply(std::shared_ptr<TColumnEngineChanges> indexChanges) const noexcept override;
-private:
- // Although source batches are ordered only by PK (sorting key) resulting pathBatches are ordered by extended key.
- // They have const snapshot columns that do not break sorting inside batch.
- std::shared_ptr<arrow::RecordBatch> AddSpecials(const std::shared_ptr<arrow::RecordBatch>& srcBatch,
- const TIndexInfo& indexInfo, const TInsertedData& inserted) const;
-};
-
-class TCompactionLogic: public TIndexLogicBase {
-public:
- using TIndexLogicBase::TIndexLogicBase;
-
-protected:
- virtual TConclusion<std::vector<TString>> DoApply(std::shared_ptr<TColumnEngineChanges> indexChanges) const noexcept override;
-private:
- std::vector<TString> CompactSplitGranule(const std::shared_ptr<TCompactColumnEngineChanges>& changes) const;
- std::vector<TString> CompactInGranule(std::shared_ptr<TCompactColumnEngineChanges> changes) const;
- std::pair<std::shared_ptr<arrow::RecordBatch>, TSnapshot> CompactInOneGranule(ui64 granule, const std::vector<TPortionInfo>& portions, const THashMap<TBlobRange, TString>& blobs) const;
-
- /// @return vec({ts, batch}). ts0 <= ts1 <= ... <= tsN
- /// @note We use ts from PK for split but there could be lots PK with the same ts.
- std::vector<std::pair<TMark, std::shared_ptr<arrow::RecordBatch>>>
- SliceGranuleBatches(const TIndexInfo& indexInfo,
- const TCompactColumnEngineChanges& changes,
- const std::vector<std::shared_ptr<arrow::RecordBatch>>& batches,
- const TMark& ts0) const;
-
- /// @param[in,out] portions unchanged or only inserted portions in the same orders
- /// @param[in,out] tsIds unchanged or marks from compacted portions ordered by mark
- /// @param[in,out] toMove unchanged or compacted portions ordered by primary key
- ui64 TryMovePortions(const TMark& ts0,
- std::vector<TPortionInfo>& portions,
- std::vector<std::pair<TMark, ui64>>& tsIds,
- std::vector<std::pair<TPortionInfo, ui64>>& toMove) const;
-
- std::pair<std::vector<std::shared_ptr<arrow::RecordBatch>>, TSnapshot> PortionsToBatches(const std::vector<TPortionInfo>& portions,
- const THashMap<TBlobRange, TString>& blobs,
- bool insertedOnly = false) const;
-};
-
-class TEvictionLogic: public TIndexLogicBase {
-public:
- using TIndexLogicBase::TIndexLogicBase;
-
-protected:
- virtual TConclusion<std::vector<TString>> DoApply(std::shared_ptr<TColumnEngineChanges> indexChanges) const noexcept override;
-private:
- bool UpdateEvictedPortion(TPortionInfo& portionInfo,
- TPortionEvictionFeatures& evictFeatures, const THashMap<TBlobRange, TString>& srcBlobs,
- std::vector<TColumnRecord>& evictedRecords, std::vector<TString>& newBlobs) const;
-};
-
-}
diff --git a/ydb/core/tx/columnshard/engines/indexed_read_data.cpp b/ydb/core/tx/columnshard/engines/indexed_read_data.cpp
index 57bec00c40c..ef49628706e 100644
--- a/ydb/core/tx/columnshard/engines/indexed_read_data.cpp
+++ b/ydb/core/tx/columnshard/engines/indexed_read_data.cpp
@@ -2,6 +2,7 @@
#include "indexed_read_data.h"
#include "filter.h"
#include "column_engine_logs.h"
+#include "changes/mark_granules.h"
#include <ydb/core/tx/columnshard/columnshard__index_scan.h>
#include <ydb/core/tx/columnshard/columnshard__stats_scan.h>
#include <ydb/core/formats/arrow/one_batch_input_stream.h>
diff --git a/ydb/core/tx/columnshard/engines/portions/portion_info.cpp b/ydb/core/tx/columnshard/engines/portions/portion_info.cpp
index b20a716f9e6..a7a36b0813a 100644
--- a/ydb/core/tx/columnshard/engines/portions/portion_info.cpp
+++ b/ydb/core/tx/columnshard/engines/portions/portion_info.cpp
@@ -221,10 +221,11 @@ std::shared_ptr<arrow::Scalar> TPortionInfo::MinValue(ui32 columnId) const {
}
std::shared_ptr<arrow::Scalar> TPortionInfo::MaxValue(ui32 columnId) const {
- if (!Meta.ColumnMeta.contains(columnId)) {
+ auto it = Meta.ColumnMeta.find(columnId);
+ if (it == Meta.ColumnMeta.end()) {
return {};
}
- return Meta.ColumnMeta.find(columnId)->second.Max;
+ return it->second.Max;
}
std::shared_ptr<arrow::ChunkedArray> TPortionInfo::TPreparedColumn::Assemble() const {
diff --git a/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp b/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp
index 04617def4e5..914ef584cb0 100644
--- a/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp
+++ b/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp
@@ -1,7 +1,6 @@
#include <library/cpp/testing/unittest/registar.h>
#include "column_engine_logs.h"
#include "predicate/predicate.h"
-#include "index_logic_logs.h"
#include <ydb/core/tx/columnshard/columnshard_ut_common.h>
@@ -267,15 +266,16 @@ bool Insert(TColumnEngineForLogs& engine, TTestDbWrapper& db, TSnapshot snap,
}
changes->Blobs.insert(blobs.begin(), blobs.end());
+ changes->StartEmergency();
+
+ NOlap::TConstructionContext context(engine.GetVersionedIndex(), NColumnShard::TIndexationCounters("Indexation"));
+ std::vector<TString> newBlobs = std::move(changes->ConstructBlobs(context).DetachResult());
- TIndexationLogic logic(engine.GetVersionedIndex(), NColumnShard::TIndexationCounters("Indexation"));
- std::vector<TString> newBlobs = std::move(logic.Apply(changes).DetachResult());
UNIT_ASSERT_VALUES_EQUAL(changes->AppendedPortions.size(), 1);
UNIT_ASSERT_VALUES_EQUAL(newBlobs.size(), testColumns.size() + 2); // add 2 columns: planStep, txId
AddIdsToBlobs(newBlobs, changes->AppendedPortions, blobs, step);
- changes->StartEmergency();
const bool result = engine.ApplyChanges(db, changes, snap);
changes->AbortEmergency();
return result;
@@ -296,15 +296,15 @@ bool Compact(TColumnEngineForLogs& engine, TTestDbWrapper& db, TSnapshot snap, T
std::shared_ptr<TCompactColumnEngineChanges> changes = engine.StartCompaction(std::move(compactionInfo), TestLimits());
UNIT_ASSERT_VALUES_EQUAL(changes->SwitchedPortions.size(), expected.SrcPortions);
changes->SetBlobs(std::move(blobs));
+ changes->StartEmergency();
+ NOlap::TConstructionContext context(engine.GetVersionedIndex(), NColumnShard::TIndexationCounters("Compaction"));
+ std::vector<TString> newBlobs = std::move(changes->ConstructBlobs(context).DetachResult());
- TCompactionLogic logic(engine.GetVersionedIndex(), NColumnShard::TIndexationCounters("Compaction"));
- std::vector<TString> newBlobs = std::move(logic.Apply(changes).DetachResult());
UNIT_ASSERT_VALUES_EQUAL(changes->AppendedPortions.size(), expected.NewPortions);
AddIdsToBlobs(newBlobs, changes->AppendedPortions, changes->Blobs, step);
UNIT_ASSERT_VALUES_EQUAL(changes->GetTmpGranuleIds().size(), expected.NewGranules);
- changes->StartEmergency();
const bool result = engine.ApplyChanges(db, changes, snap);
changes->AbortEmergency();
return result;
diff --git a/ydb/core/tx/columnshard/engines/ut_program.cpp b/ydb/core/tx/columnshard/engines/ut_program.cpp
index 6b112a6a110..fecb3e8080e 100644
--- a/ydb/core/tx/columnshard/engines/ut_program.cpp
+++ b/ydb/core/tx/columnshard/engines/ut_program.cpp
@@ -1,5 +1,4 @@
#include "index_info.h"
-#include "index_logic_logs.h"
#include <ydb/core/tx/columnshard/columnshard__index_scan.h>
#include <ydb/core/tx/columnshard/columnshard_ut_common.h>
diff --git a/ydb/core/tx/columnshard/engines/ya.make b/ydb/core/tx/columnshard/engines/ya.make
index 6c1a4c78344..54de6c172cc 100644
--- a/ydb/core/tx/columnshard/engines/ya.make
+++ b/ydb/core/tx/columnshard/engines/ya.make
@@ -11,7 +11,6 @@ SRCS(
db_wrapper.cpp
index_info.cpp
indexed_read_data.cpp
- index_logic_logs.cpp
filter.cpp
portion_info.cpp
tier_info.cpp
diff --git a/ydb/core/tx/columnshard/eviction_actor.cpp b/ydb/core/tx/columnshard/eviction_actor.cpp
index f6221302782..82621bf8615 100644
--- a/ydb/core/tx/columnshard/eviction_actor.cpp
+++ b/ydb/core/tx/columnshard/eviction_actor.cpp
@@ -1,6 +1,5 @@
#include "columnshard_impl.h"
#include <ydb/core/tx/columnshard/engines/column_engine_logs.h>
-#include <ydb/core/tx/columnshard/engines/index_logic_logs.h>
#include "blob_cache.h"
namespace NKikimr::NColumnShard {
@@ -133,8 +132,8 @@ private:
auto guard = TxEvent->PutResult->StartCpuGuard();
TxEvent->IndexChanges->SetBlobs(std::move(Blobs));
- NOlap::TEvictionLogic evictionLogic(TxEvent->IndexInfo, TxEvent->Tiering, Counters);
- TxEvent->Blobs = std::move(evictionLogic.Apply(TxEvent->IndexChanges).DetachResult());
+ NOlap::TConstructionContext context(TxEvent->IndexInfo, TxEvent->Tiering, Counters);
+ TxEvent->Blobs = std::move(TxEvent->IndexChanges->ConstructBlobs(context).DetachResult());
if (TxEvent->Blobs.empty()) {
TxEvent->SetPutStatus(NKikimrProto::OK);
}
@@ -143,7 +142,6 @@ private:
ctx.Send(Parent, TxEvent.release());
LOG_S_DEBUG("Portions eviction finished (" << blobsSize << " new blobs) at tablet " << TabletId);
- //Die(ctx); // It's alive till tablet's death
}
};
diff --git a/ydb/core/tx/columnshard/indexing_actor.cpp b/ydb/core/tx/columnshard/indexing_actor.cpp
index cafdd83dcf4..e320d935589 100644
--- a/ydb/core/tx/columnshard/indexing_actor.cpp
+++ b/ydb/core/tx/columnshard/indexing_actor.cpp
@@ -2,7 +2,6 @@
#include "columnshard_impl.h"
#include "engines/changes/indexation.h"
#include <ydb/core/tx/columnshard/engines/column_engine_logs.h>
-#include <ydb/core/tx/columnshard/engines/index_logic_logs.h>
#include <ydb/core/tx/conveyor/usage/events.h>
#include <ydb/core/tx/conveyor/usage/service.h>
@@ -128,8 +127,8 @@ private:
virtual bool DoExecute() override {
auto guard = TxEvent->PutResult->StartCpuGuard();
- NOlap::TIndexationLogic indexationLogic(TxEvent->IndexInfo, TxEvent->Tiering, Counters);
- TxEvent->Blobs = std::move(indexationLogic.Apply(TxEvent->IndexChanges).DetachResult());
+ NOlap::TConstructionContext context(TxEvent->IndexInfo, TxEvent->Tiering, Counters);
+ TxEvent->Blobs = std::move(TxEvent->IndexChanges->ConstructBlobs(context).DetachResult());
return true;
}
public:
diff --git a/ydb/core/tx/columnshard/splitter/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/columnshard/splitter/CMakeLists.darwin-x86_64.txt
new file mode 100644
index 00000000000..9fb62b49ac6
--- /dev/null
+++ b/ydb/core/tx/columnshard/splitter/CMakeLists.darwin-x86_64.txt
@@ -0,0 +1,20 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(tx-columnshard-splitter)
+target_link_libraries(tx-columnshard-splitter PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ libs-apache-arrow
+ columnshard-engines-storage
+ columnshard-engines-scheme
+)
+target_sources(tx-columnshard-splitter PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/splitter/splitter.cpp
+)
diff --git a/ydb/core/tx/columnshard/splitter/CMakeLists.linux-aarch64.txt b/ydb/core/tx/columnshard/splitter/CMakeLists.linux-aarch64.txt
new file mode 100644
index 00000000000..014ec6fd94d
--- /dev/null
+++ b/ydb/core/tx/columnshard/splitter/CMakeLists.linux-aarch64.txt
@@ -0,0 +1,21 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(tx-columnshard-splitter)
+target_link_libraries(tx-columnshard-splitter PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ libs-apache-arrow
+ columnshard-engines-storage
+ columnshard-engines-scheme
+)
+target_sources(tx-columnshard-splitter PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/splitter/splitter.cpp
+)
diff --git a/ydb/core/tx/columnshard/splitter/CMakeLists.linux-x86_64.txt b/ydb/core/tx/columnshard/splitter/CMakeLists.linux-x86_64.txt
new file mode 100644
index 00000000000..014ec6fd94d
--- /dev/null
+++ b/ydb/core/tx/columnshard/splitter/CMakeLists.linux-x86_64.txt
@@ -0,0 +1,21 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(tx-columnshard-splitter)
+target_link_libraries(tx-columnshard-splitter PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ libs-apache-arrow
+ columnshard-engines-storage
+ columnshard-engines-scheme
+)
+target_sources(tx-columnshard-splitter PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/splitter/splitter.cpp
+)
diff --git a/ydb/core/tx/columnshard/splitter/CMakeLists.txt b/ydb/core/tx/columnshard/splitter/CMakeLists.txt
new file mode 100644
index 00000000000..f8b31df0c11
--- /dev/null
+++ b/ydb/core/tx/columnshard/splitter/CMakeLists.txt
@@ -0,0 +1,17 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-aarch64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64")
+ include(CMakeLists.darwin-x86_64.txt)
+elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA)
+ include(CMakeLists.windows-x86_64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-x86_64.txt)
+endif()
diff --git a/ydb/core/tx/columnshard/splitter/CMakeLists.windows-x86_64.txt b/ydb/core/tx/columnshard/splitter/CMakeLists.windows-x86_64.txt
new file mode 100644
index 00000000000..9fb62b49ac6
--- /dev/null
+++ b/ydb/core/tx/columnshard/splitter/CMakeLists.windows-x86_64.txt
@@ -0,0 +1,20 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(tx-columnshard-splitter)
+target_link_libraries(tx-columnshard-splitter PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ libs-apache-arrow
+ columnshard-engines-storage
+ columnshard-engines-scheme
+)
+target_sources(tx-columnshard-splitter PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/splitter/splitter.cpp
+)
diff --git a/ydb/core/tx/columnshard/splitter/splitter.cpp b/ydb/core/tx/columnshard/splitter/splitter.cpp
new file mode 100644
index 00000000000..eb22d088b61
--- /dev/null
+++ b/ydb/core/tx/columnshard/splitter/splitter.cpp
@@ -0,0 +1,112 @@
+#include "splitter.h"
+
+namespace NKikimr::NOlap {
+
+TSplitLimiter::TSplitLimiter(const TGranuleMeta* granuleMeta, const NColumnShard::TIndexationCounters& counters, ISnapshotSchema::TPtr schema, const std::shared_ptr<arrow::RecordBatch> batch): Counters(counters)
+, Batch(batch)
+, Schema(schema) {
+ if (granuleMeta && granuleMeta->GetAdditiveSummary().GetOther().GetRecordsCount()) {
+ Y_VERIFY(granuleMeta->GetHardSummary().GetColumnIdsSortedBySizeDescending().size());
+ SortedColumnIds = granuleMeta->GetHardSummary().GetColumnIdsSortedBySizeDescending();
+ const auto biggestColumn = SortedColumnIds.front();
+ Y_VERIFY(biggestColumn.GetPackedBlobsSize());
+ const double expectedPackedRecordSize = 1.0 * biggestColumn.GetPackedBlobsSize() / granuleMeta->GetAdditiveSummary().GetOther().GetRecordsCount();
+ BaseStepRecordsCount = ExpectedBlobSize / expectedPackedRecordSize;
+ for (ui32 i = 1; i < SortedColumnIds.size(); ++i) {
+ Y_VERIFY(SortedColumnIds[i - 1].GetPackedBlobsSize() >= SortedColumnIds[i].GetPackedBlobsSize());
+ }
+ if (BaseStepRecordsCount > batch->num_rows()) {
+ BaseStepRecordsCount = batch->num_rows();
+ } else {
+ BaseStepRecordsCount = batch->num_rows() / (ui32)(batch->num_rows() / BaseStepRecordsCount);
+ if (BaseStepRecordsCount * expectedPackedRecordSize > TCompactionLimits::MAX_BLOB_SIZE) {
+ BaseStepRecordsCount = ExpectedBlobSize / expectedPackedRecordSize;
+ }
+ }
+ } else {
+ for (auto&& i : Schema->GetIndexInfo().GetColumnIds()) {
+ SortedColumnIds.emplace_back(TColumnSummary(i));
+ }
+ BaseStepRecordsCount = batch->num_rows();
+ }
+ BaseStepRecordsCount = std::min<ui32>(BaseStepRecordsCount, Batch->num_rows());
+ Y_VERIFY(BaseStepRecordsCount);
+ CurrentStepRecordsCount = BaseStepRecordsCount;
+}
+
+bool TSplitLimiter::Next(std::vector<TString>& portionBlobs, std::shared_ptr<arrow::RecordBatch>& batch, const TSaverContext& saverContext) {
+ if (Position == Batch->num_rows()) {
+ return false;
+ }
+
+ portionBlobs.resize(Schema->GetSchema()->num_fields());
+ while (true) {
+ Y_VERIFY(Position < Batch->num_rows());
+ std::shared_ptr<arrow::RecordBatch> currentBatch;
+ if (Batch->num_rows() - Position < CurrentStepRecordsCount * 1.1) {
+ currentBatch = Batch->Slice(Position, Batch->num_rows() - Position);
+ } else {
+ currentBatch = Batch->Slice(Position, CurrentStepRecordsCount);
+ }
+
+ ui32 fillCounter = 0;
+ for (const auto& columnSummary : SortedColumnIds) {
+ const TString& columnName = Schema->GetIndexInfo().GetColumnName(columnSummary.GetColumnId());
+ const int idx = Schema->GetFieldIndex(columnSummary.GetColumnId());
+ Y_VERIFY(idx >= 0);
+ auto field = Schema->GetFieldByIndex(idx);
+ Y_VERIFY(field);
+ auto array = currentBatch->GetColumnByName(columnName);
+ Y_VERIFY(array);
+ auto columnSaver = Schema->GetColumnSaver(columnSummary.GetColumnId(), saverContext);
+ TString blob = TPortionInfo::SerializeColumn(array, field, columnSaver);
+ if (blob.size() >= TCompactionLimits::MAX_BLOB_SIZE) {
+ Counters.TrashDataSerializationBytes->Add(blob.size());
+ Counters.TrashDataSerialization->Add(1);
+ Counters.TrashDataSerializationHistogramBytes->Collect(blob.size());
+ const double kffNew = 1.0 * ExpectedBlobSize / blob.size() * ReduceCorrectionKff;
+ CurrentStepRecordsCount = currentBatch->num_rows() * kffNew;
+ Y_VERIFY(CurrentStepRecordsCount);
+ break;
+ } else {
+ Counters.CorrectDataSerializationBytes->Add(blob.size());
+ Counters.CorrectDataSerialization->Add(1);
+ }
+
+ portionBlobs[idx] = std::move(blob);
+ ++fillCounter;
+ }
+
+ if (fillCounter == portionBlobs.size()) {
+ Y_VERIFY(fillCounter == portionBlobs.size());
+ Position += currentBatch->num_rows();
+ Y_VERIFY(Position <= Batch->num_rows());
+ ui64 maxBlobSize = 0;
+ for (auto&& i : portionBlobs) {
+ Counters.SplittedPortionColumnSize->Collect(i.size());
+ maxBlobSize = std::max<ui64>(maxBlobSize, i.size());
+ }
+ batch = currentBatch;
+ if (maxBlobSize < MinBlobSize) {
+ if ((Position != currentBatch->num_rows() || Position != Batch->num_rows())) {
+ Counters.SplittedPortionLargestColumnSize->Collect(maxBlobSize);
+ Counters.TooSmallBlob->Add(1);
+ if (Position == Batch->num_rows()) {
+ Counters.TooSmallBlobFinish->Add(1);
+ }
+ if (Position == currentBatch->num_rows()) {
+ Counters.TooSmallBlobStart->Add(1);
+ }
+ } else {
+ Counters.SimpleSplitPortionLargestColumnSize->Collect(maxBlobSize);
+ }
+ CurrentStepRecordsCount = currentBatch->num_rows() * IncreaseCorrectionKff;
+ } else {
+ Counters.SplittedPortionLargestColumnSize->Collect(maxBlobSize);
+ }
+ return true;
+ }
+ }
+}
+
+}
diff --git a/ydb/core/tx/columnshard/splitter/splitter.h b/ydb/core/tx/columnshard/splitter/splitter.h
new file mode 100644
index 00000000000..6528fd7ab6b
--- /dev/null
+++ b/ydb/core/tx/columnshard/splitter/splitter.h
@@ -0,0 +1,32 @@
+#pragma once
+#include <ydb/core/tx/columnshard/counters/indexation.h>
+#include <ydb/core/tx/columnshard/engines/scheme/column_features.h>
+#include <ydb/core/tx/columnshard/engines/scheme/abstract_scheme.h>
+#include <ydb/core/tx/columnshard/engines/storage/granule.h>
+
+#include <contrib/libs/apache/arrow/cpp/src/arrow/record_batch.h>
+
+namespace NKikimr::NOlap {
+
+class TSplitLimiter {
+private:
+ static const inline double ReduceCorrectionKff = 0.9;
+ static const inline double IncreaseCorrectionKff = 1.1;
+ static const inline ui64 ExpectedBlobSize = 6 * 1024 * 1024;
+ static const inline ui64 MinBlobSize = 1 * 1024 * 1024;
+
+ const NColumnShard::TIndexationCounters Counters;
+ ui32 BaseStepRecordsCount = 0;
+ ui32 CurrentStepRecordsCount = 0;
+ std::shared_ptr<arrow::RecordBatch> Batch;
+ std::vector<TColumnSummary> SortedColumnIds;
+ ui32 Position = 0;
+ ISnapshotSchema::TPtr Schema;
+public:
+ TSplitLimiter(const TGranuleMeta* granuleMeta, const NColumnShard::TIndexationCounters& counters,
+ ISnapshotSchema::TPtr schema, const std::shared_ptr<arrow::RecordBatch> batch);
+
+ bool Next(std::vector<TString>& portionBlobs, std::shared_ptr<arrow::RecordBatch>& batch, const TSaverContext& saverContext);
+};
+
+}
diff --git a/ydb/core/tx/columnshard/splitter/ya.make b/ydb/core/tx/columnshard/splitter/ya.make
new file mode 100644
index 00000000000..88823997832
--- /dev/null
+++ b/ydb/core/tx/columnshard/splitter/ya.make
@@ -0,0 +1,13 @@
+LIBRARY()
+
+SRCS(
+ splitter.cpp
+)
+
+PEERDIR(
+ contrib/libs/apache/arrow
+ ydb/core/tx/columnshard/engines/storage
+ ydb/core/tx/columnshard/engines/scheme
+)
+
+END()
diff --git a/ydb/core/tx/columnshard/tables_manager.cpp b/ydb/core/tx/columnshard/tables_manager.cpp
index 24afb2c78e0..0633adac11d 100644
--- a/ydb/core/tx/columnshard/tables_manager.cpp
+++ b/ydb/core/tx/columnshard/tables_manager.cpp
@@ -283,11 +283,6 @@ void TTablesManager::IndexSchemaVersion(const TRowVersion& version, const NKikim
}
}
-std::shared_ptr<NOlap::TCleanupColumnEngineChanges> TTablesManager::StartIndexCleanup(const NOlap::TSnapshot& snapshot, const NOlap::TCompactionLimits& limits, ui32 maxRecords) {
- Y_VERIFY(PrimaryIndex);
- return PrimaryIndex->StartCleanup(snapshot, limits, PathsToDrop, maxRecords);
-}
-
NOlap::TIndexInfo TTablesManager::DeserializeIndexInfoFromProto(const NKikimrSchemeOp::TColumnTableSchema& schema) {
std::optional<NOlap::TIndexInfo> indexInfo = NOlap::TIndexInfo::BuildFromProto(schema);
Y_VERIFY(indexInfo);
diff --git a/ydb/core/tx/columnshard/tables_manager.h b/ydb/core/tx/columnshard/tables_manager.h
index 8fd3273c1fb..af2f9780704 100644
--- a/ydb/core/tx/columnshard/tables_manager.h
+++ b/ydb/core/tx/columnshard/tables_manager.h
@@ -150,6 +150,10 @@ public:
return PathsToDrop;
}
+ THashSet<ui64>& MutablePathsToDrop() {
+ return PathsToDrop;
+ }
+
const THashMap<ui64, TTableInfo>& GetTables() const {
return Tables;
}
@@ -208,9 +212,6 @@ public:
void AddTableVersion(const ui64 pathId, const TRowVersion& version, const TTableInfo::TTableVersionInfo& versionInfo, NIceDb::TNiceDb& db);
void OnTtlUpdate();
-
- std::shared_ptr<NOlap::TCleanupColumnEngineChanges> StartIndexCleanup(const NOlap::TSnapshot& snapshot, const NOlap::TCompactionLimits& limits, ui32 maxRecords);
-
private:
void IndexSchemaVersion(const TRowVersion& version, const NKikimrSchemeOp::TColumnTableSchema& schema);
static NOlap::TIndexInfo DeserializeIndexInfoFromProto(const NKikimrSchemeOp::TColumnTableSchema& schema);
diff --git a/ydb/core/tx/columnshard/ya.make b/ydb/core/tx/columnshard/ya.make
index 52adbd0fe65..14b1e978ca0 100644
--- a/ydb/core/tx/columnshard/ya.make
+++ b/ydb/core/tx/columnshard/ya.make
@@ -56,6 +56,7 @@ PEERDIR(
ydb/core/tx/columnshard/engines/writer
ydb/core/tx/columnshard/counters
ydb/core/tx/columnshard/common
+ ydb/core/tx/columnshard/splitter
ydb/core/tx/tiering
ydb/core/tx/conveyor/usage
ydb/core/tx/long_tx_service/public