diff options
author | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-07-23 10:00:51 +0300 |
---|---|---|
committer | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-07-23 10:00:51 +0300 |
commit | b71a866bb25a4da400b09b0dc754094946a9f67d (patch) | |
tree | 035bff4eff806852ebaae3e70ebeec22e24c66a8 | |
parent | c402475a0a16ee3bb780ef7849a19bac8956e21f (diff) | |
download | ydb-b71a866bb25a4da400b09b0dc754094946a9f67d.tar.gz |
KIKIMR-18796: final incapsulate service activity in CS
51 files changed, 1201 insertions, 1040 deletions
diff --git a/ydb/core/tx/columnshard/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/columnshard/CMakeLists.darwin-x86_64.txt index 85db168ab0f..07c0eaea85c 100644 --- a/ydb/core/tx/columnshard/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/tx/columnshard/CMakeLists.darwin-x86_64.txt @@ -11,6 +11,7 @@ add_subdirectory(counters) add_subdirectory(engines) add_subdirectory(hooks) add_subdirectory(resources) +add_subdirectory(splitter) add_subdirectory(ut_rw) add_subdirectory(ut_schema) get_built_tool_path( @@ -48,6 +49,7 @@ target_link_libraries(core-tx-columnshard PUBLIC columnshard-engines-writer tx-columnshard-counters tx-columnshard-common + tx-columnshard-splitter core-tx-tiering tx-conveyor-usage tx-long_tx_service-public diff --git a/ydb/core/tx/columnshard/CMakeLists.linux-aarch64.txt b/ydb/core/tx/columnshard/CMakeLists.linux-aarch64.txt index 44e7122c497..14cdf344f20 100644 --- a/ydb/core/tx/columnshard/CMakeLists.linux-aarch64.txt +++ b/ydb/core/tx/columnshard/CMakeLists.linux-aarch64.txt @@ -11,6 +11,7 @@ add_subdirectory(counters) add_subdirectory(engines) add_subdirectory(hooks) add_subdirectory(resources) +add_subdirectory(splitter) add_subdirectory(ut_rw) add_subdirectory(ut_schema) get_built_tool_path( @@ -49,6 +50,7 @@ target_link_libraries(core-tx-columnshard PUBLIC columnshard-engines-writer tx-columnshard-counters tx-columnshard-common + tx-columnshard-splitter core-tx-tiering tx-conveyor-usage tx-long_tx_service-public diff --git a/ydb/core/tx/columnshard/CMakeLists.linux-x86_64.txt b/ydb/core/tx/columnshard/CMakeLists.linux-x86_64.txt index 44e7122c497..14cdf344f20 100644 --- a/ydb/core/tx/columnshard/CMakeLists.linux-x86_64.txt +++ b/ydb/core/tx/columnshard/CMakeLists.linux-x86_64.txt @@ -11,6 +11,7 @@ add_subdirectory(counters) add_subdirectory(engines) add_subdirectory(hooks) add_subdirectory(resources) +add_subdirectory(splitter) add_subdirectory(ut_rw) add_subdirectory(ut_schema) get_built_tool_path( @@ -49,6 +50,7 @@ target_link_libraries(core-tx-columnshard PUBLIC columnshard-engines-writer tx-columnshard-counters tx-columnshard-common + tx-columnshard-splitter core-tx-tiering tx-conveyor-usage tx-long_tx_service-public diff --git a/ydb/core/tx/columnshard/CMakeLists.windows-x86_64.txt b/ydb/core/tx/columnshard/CMakeLists.windows-x86_64.txt index 85db168ab0f..07c0eaea85c 100644 --- a/ydb/core/tx/columnshard/CMakeLists.windows-x86_64.txt +++ b/ydb/core/tx/columnshard/CMakeLists.windows-x86_64.txt @@ -11,6 +11,7 @@ add_subdirectory(counters) add_subdirectory(engines) add_subdirectory(hooks) add_subdirectory(resources) +add_subdirectory(splitter) add_subdirectory(ut_rw) add_subdirectory(ut_schema) get_built_tool_path( @@ -48,6 +49,7 @@ target_link_libraries(core-tx-columnshard PUBLIC columnshard-engines-writer tx-columnshard-counters tx-columnshard-common + tx-columnshard-splitter core-tx-tiering tx-conveyor-usage tx-long_tx_service-public diff --git a/ydb/core/tx/columnshard/columnshard_impl.cpp b/ydb/core/tx/columnshard/columnshard_impl.cpp index e1a13775299..990415922de 100644 --- a/ydb/core/tx/columnshard/columnshard_impl.cpp +++ b/ydb/core/tx/columnshard/columnshard_impl.cpp @@ -838,7 +838,8 @@ std::unique_ptr<TEvPrivate::TEvWriteIndex> TColumnShard::SetupCleanup() { NOlap::TSnapshot cleanupSnapshot{GetMinReadStep(), 0}; - auto changes = TablesManager.StartIndexCleanup(cleanupSnapshot, CompactionLimits.Get(), TLimits::MAX_TX_RECORDS); + auto changes = + TablesManager.MutablePrimaryIndex().StartCleanup(cleanupSnapshot, CompactionLimits.Get(), TablesManager.MutablePathsToDrop(), TLimits::MAX_TX_RECORDS); if (!changes) { LOG_S_INFO("Cannot prepare cleanup at tablet " << TabletID()); return {}; diff --git a/ydb/core/tx/columnshard/compaction_actor.cpp b/ydb/core/tx/columnshard/compaction_actor.cpp index e9adfe87a5b..c771ce04c59 100644 --- a/ydb/core/tx/columnshard/compaction_actor.cpp +++ b/ydb/core/tx/columnshard/compaction_actor.cpp @@ -2,7 +2,6 @@ #include "blob_cache.h" #include <ydb/core/tx/columnshard/engines/column_engine_logs.h> -#include <ydb/core/tx/columnshard/engines/index_logic_logs.h> #include <ydb/core/tx/conveyor/usage/events.h> #include <ydb/core/tx/conveyor/usage/service.h> @@ -135,8 +134,8 @@ private: virtual bool DoExecute() override { auto guard = TxEvent->PutResult->StartCpuGuard(); - NOlap::TCompactionLogic compactionLogic(TxEvent->IndexInfo, TxEvent->Tiering, Counters); - TxEvent->Blobs = std::move(compactionLogic.Apply(TxEvent->IndexChanges).DetachResult()); + NOlap::TConstructionContext context(TxEvent->IndexInfo, TxEvent->Tiering, Counters); + TxEvent->Blobs = std::move(TxEvent->IndexChanges->ConstructBlobs(context).DetachResult()); return true; } public: diff --git a/ydb/core/tx/columnshard/engines/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/columnshard/engines/CMakeLists.darwin-x86_64.txt index 8c5b9dab27a..94f29e68936 100644 --- a/ydb/core/tx/columnshard/engines/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/tx/columnshard/engines/CMakeLists.darwin-x86_64.txt @@ -47,7 +47,6 @@ target_sources(tx-columnshard-engines PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/db_wrapper.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/index_info.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/indexed_read_data.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/index_logic_logs.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/filter.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/portion_info.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/tier_info.cpp diff --git a/ydb/core/tx/columnshard/engines/CMakeLists.linux-aarch64.txt b/ydb/core/tx/columnshard/engines/CMakeLists.linux-aarch64.txt index 7faf17091fd..08ceed7b245 100644 --- a/ydb/core/tx/columnshard/engines/CMakeLists.linux-aarch64.txt +++ b/ydb/core/tx/columnshard/engines/CMakeLists.linux-aarch64.txt @@ -48,7 +48,6 @@ target_sources(tx-columnshard-engines PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/db_wrapper.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/index_info.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/indexed_read_data.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/index_logic_logs.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/filter.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/portion_info.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/tier_info.cpp diff --git a/ydb/core/tx/columnshard/engines/CMakeLists.linux-x86_64.txt b/ydb/core/tx/columnshard/engines/CMakeLists.linux-x86_64.txt index 7faf17091fd..08ceed7b245 100644 --- a/ydb/core/tx/columnshard/engines/CMakeLists.linux-x86_64.txt +++ b/ydb/core/tx/columnshard/engines/CMakeLists.linux-x86_64.txt @@ -48,7 +48,6 @@ target_sources(tx-columnshard-engines PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/db_wrapper.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/index_info.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/indexed_read_data.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/index_logic_logs.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/filter.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/portion_info.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/tier_info.cpp diff --git a/ydb/core/tx/columnshard/engines/CMakeLists.windows-x86_64.txt b/ydb/core/tx/columnshard/engines/CMakeLists.windows-x86_64.txt index 8c5b9dab27a..94f29e68936 100644 --- a/ydb/core/tx/columnshard/engines/CMakeLists.windows-x86_64.txt +++ b/ydb/core/tx/columnshard/engines/CMakeLists.windows-x86_64.txt @@ -47,7 +47,6 @@ target_sources(tx-columnshard-engines PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/db_wrapper.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/index_info.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/indexed_read_data.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/index_logic_logs.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/filter.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/portion_info.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/tier_info.cpp diff --git a/ydb/core/tx/columnshard/engines/changes/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/columnshard/engines/changes/CMakeLists.darwin-x86_64.txt index eae85c341ba..922cccb5001 100644 --- a/ydb/core/tx/columnshard/engines/changes/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/tx/columnshard/engines/changes/CMakeLists.darwin-x86_64.txt @@ -20,6 +20,7 @@ target_link_libraries(columnshard-engines-changes PUBLIC core-formats-arrow tx-columnshard-common columnshard-engines-insert_table + tx-columnshard-splitter ydb-core-tablet_flat core-tx-tiering ydb-core-protos @@ -29,6 +30,8 @@ target_sources(columnshard-engines-changes PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/changes/abstract.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/changes/mark.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/changes/compaction.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/changes/in_granule_compaction.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/changes/split_compaction.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/changes/ttl.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/changes/indexation.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/changes/cleanup.cpp diff --git a/ydb/core/tx/columnshard/engines/changes/CMakeLists.linux-aarch64.txt b/ydb/core/tx/columnshard/engines/changes/CMakeLists.linux-aarch64.txt index c7f073d7d3a..bf1a569276b 100644 --- a/ydb/core/tx/columnshard/engines/changes/CMakeLists.linux-aarch64.txt +++ b/ydb/core/tx/columnshard/engines/changes/CMakeLists.linux-aarch64.txt @@ -21,6 +21,7 @@ target_link_libraries(columnshard-engines-changes PUBLIC core-formats-arrow tx-columnshard-common columnshard-engines-insert_table + tx-columnshard-splitter ydb-core-tablet_flat core-tx-tiering ydb-core-protos @@ -30,6 +31,8 @@ target_sources(columnshard-engines-changes PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/changes/abstract.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/changes/mark.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/changes/compaction.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/changes/in_granule_compaction.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/changes/split_compaction.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/changes/ttl.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/changes/indexation.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/changes/cleanup.cpp diff --git a/ydb/core/tx/columnshard/engines/changes/CMakeLists.linux-x86_64.txt b/ydb/core/tx/columnshard/engines/changes/CMakeLists.linux-x86_64.txt index c7f073d7d3a..bf1a569276b 100644 --- a/ydb/core/tx/columnshard/engines/changes/CMakeLists.linux-x86_64.txt +++ b/ydb/core/tx/columnshard/engines/changes/CMakeLists.linux-x86_64.txt @@ -21,6 +21,7 @@ target_link_libraries(columnshard-engines-changes PUBLIC core-formats-arrow tx-columnshard-common columnshard-engines-insert_table + tx-columnshard-splitter ydb-core-tablet_flat core-tx-tiering ydb-core-protos @@ -30,6 +31,8 @@ target_sources(columnshard-engines-changes PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/changes/abstract.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/changes/mark.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/changes/compaction.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/changes/in_granule_compaction.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/changes/split_compaction.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/changes/ttl.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/changes/indexation.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/changes/cleanup.cpp diff --git a/ydb/core/tx/columnshard/engines/changes/CMakeLists.windows-x86_64.txt b/ydb/core/tx/columnshard/engines/changes/CMakeLists.windows-x86_64.txt index eae85c341ba..922cccb5001 100644 --- a/ydb/core/tx/columnshard/engines/changes/CMakeLists.windows-x86_64.txt +++ b/ydb/core/tx/columnshard/engines/changes/CMakeLists.windows-x86_64.txt @@ -20,6 +20,7 @@ target_link_libraries(columnshard-engines-changes PUBLIC core-formats-arrow tx-columnshard-common columnshard-engines-insert_table + tx-columnshard-splitter ydb-core-tablet_flat core-tx-tiering ydb-core-protos @@ -29,6 +30,8 @@ target_sources(columnshard-engines-changes PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/changes/abstract.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/changes/mark.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/changes/compaction.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/changes/in_granule_compaction.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/changes/split_compaction.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/changes/ttl.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/changes/indexation.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/changes/cleanup.cpp diff --git a/ydb/core/tx/columnshard/engines/changes/abstract.cpp b/ydb/core/tx/columnshard/engines/changes/abstract.cpp index 332b66977a4..323c59c07cb 100644 --- a/ydb/core/tx/columnshard/engines/changes/abstract.cpp +++ b/ydb/core/tx/columnshard/engines/changes/abstract.cpp @@ -11,6 +11,84 @@ TString TColumnEngineChanges::DebugString() const { return sb.Str(); } +NKikimr::TConclusion<std::vector<TString>> TColumnEngineChanges::ConstructBlobs(TConstructionContext& context) { + Y_VERIFY(Stage == EStage::Started); + + { + ui64 readBytes = 0; + for (auto&& i : Blobs) { + readBytes += i.first.Size; + } + context.Counters.CompactionInputSize(readBytes); + } + const TMonotonic start = TMonotonic::Now(); + TConclusion<std::vector<TString>> result = DoConstructBlobs(context); + if (result.IsSuccess()) { + context.Counters.CompactionDuration->Collect((TMonotonic::Now() - start).MilliSeconds()); + } else { + context.Counters.CompactionFails->Add(1); + } + Stage = EStage::Constructed; + return result; +} + +bool TColumnEngineChanges::ApplyChanges(TColumnEngineForLogs& self, TApplyChangesContext& context, const bool dryRun) { + Y_VERIFY(Stage != EStage::Aborted); + if ((ui32)Stage >= (ui32)EStage::Applied) { + return true; + } + Y_VERIFY(Stage == EStage::Compiled); + + if (!DoApplyChanges(self, context, dryRun)) { + if (dryRun) { + OnChangesApplyFailed("problems on apply"); + } + Y_VERIFY(dryRun); + return false; + } else if (!dryRun) { + OnChangesApplyFinished(); + Stage = EStage::Applied; + } + return true; +} + +void TColumnEngineChanges::WriteIndex(NColumnShard::TColumnShard& self, TWriteIndexContext& context) { + Y_VERIFY(Stage != EStage::Aborted); + if ((ui32)Stage >= (ui32)EStage::Written) { + return; + } + Y_VERIFY(Stage == EStage::Applied); + + DoWriteIndex(self, context); + Stage = EStage::Written; +} + +void TColumnEngineChanges::WriteIndexComplete(NColumnShard::TColumnShard& self, TWriteIndexCompleteContext& context) { + Y_VERIFY(Stage == EStage::Aborted || Stage == EStage::Written); + if (Stage == EStage::Aborted) { + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "WriteIndexComplete")("stage", Stage); + return; + } + if (Stage == EStage::Written) { + Stage = EStage::Finished; + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "WriteIndexComplete")("type", TypeString())("success", context.FinishedSuccessfully); + DoWriteIndexComplete(self, context); + DoOnFinish(self, context); + } +} + +void TColumnEngineChanges::Compile(TFinalizationContext& context) noexcept { + Y_VERIFY(Stage != EStage::Aborted); + if ((ui32)Stage >= (ui32)EStage::Compiled) { + return; + } + Y_VERIFY(Stage == EStage::Constructed); + + DoCompile(context); + + Stage = EStage::Compiled; +} + TWriteIndexContext::TWriteIndexContext(NTabletFlatExecutor::TTransactionContext& txc, IDbWrapper& dbWrapper) : Txc(txc) , BlobManagerDb(std::make_shared<NColumnShard::TBlobManagerDb>(txc.DB)) diff --git a/ydb/core/tx/columnshard/engines/changes/abstract.h b/ydb/core/tx/columnshard/engines/changes/abstract.h index 49e4c967046..618aab096c6 100644 --- a/ydb/core/tx/columnshard/engines/changes/abstract.h +++ b/ydb/core/tx/columnshard/engines/changes/abstract.h @@ -23,6 +23,7 @@ class TBackgroundActivity; namespace NKikimr::NOlap { class TColumnEngineForLogs; +class TVersionedIndex; struct TCompactionLimits { static constexpr const ui64 MIN_GOOD_BLOB_SIZE = 256 * 1024; // some BlobStorage constant @@ -134,6 +135,37 @@ public: } }; +class TConstructionContext: TNonCopyable { +public: + using TTieringsHash = THashMap<ui64, NKikimr::NOlap::TTiering>; +private: + const TTieringsHash* TieringMap = nullptr; +public: + const TVersionedIndex& SchemaVersions; + const NColumnShard::TIndexationCounters Counters; + + TConstructionContext(const TVersionedIndex& schemaVersions, const TTieringsHash& tieringMap, const NColumnShard::TIndexationCounters counters) + : TieringMap(&tieringMap) + , SchemaVersions(schemaVersions) + , Counters(counters) + { + + } + + TConstructionContext(const TVersionedIndex& schemaVersions, const NColumnShard::TIndexationCounters counters) + : SchemaVersions(schemaVersions) + , Counters(counters) { + + } + + const THashMap<ui64, NKikimr::NOlap::TTiering>& GetTieringMap() const { + if (TieringMap) { + return *TieringMap; + } + return Default<THashMap<ui64, NKikimr::NOlap::TTiering>>(); + } +}; + class TGranuleMeta; class TColumnEngineChanges { @@ -141,6 +173,7 @@ public: enum class EStage: ui32 { Created = 0, Started, + Constructed, Compiled, Applied, Written, @@ -163,8 +196,15 @@ protected: virtual void DoAbort() { } + virtual bool NeedConstruction() const { + return true; + } virtual void DoStart(NColumnShard::TColumnShard& self) = 0; + virtual TConclusion<std::vector<TString>> DoConstructBlobs(TConstructionContext& context) noexcept = 0; + public: + TConclusion<std::vector<TString>> ConstructBlobs(TConstructionContext& context); + void AbortEmergency() { AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "AbortEmergency"); Stage = EStage::Aborted; @@ -178,80 +218,37 @@ public: } virtual ~TColumnEngineChanges() { - Y_VERIFY(Stage == EStage::Created || Stage == EStage::Finished || Stage == EStage::Aborted); + Y_VERIFY_DEBUG(Stage == EStage::Created || Stage == EStage::Finished || Stage == EStage::Aborted); } void Start(NColumnShard::TColumnShard& self) { Y_VERIFY(Stage == EStage::Created); DoStart(self); Stage = EStage::Started; + if (!NeedConstruction()) { + Stage = EStage::Constructed; + } } void StartEmergency() { Y_VERIFY(Stage == EStage::Created); Stage = EStage::Started; - } - - bool ApplyChanges(TColumnEngineForLogs& self, TApplyChangesContext& context, const bool dryRun) { - Y_VERIFY(Stage != EStage::Aborted); - if ((ui32)Stage >= (ui32)EStage::Applied) { - return true; + if (!NeedConstruction()) { + Stage = EStage::Constructed; } - Y_VERIFY(Stage == EStage::Compiled); - - if (!DoApplyChanges(self, context, dryRun)) { - if (dryRun) { - OnChangesApplyFailed("problems on apply"); - } - Y_VERIFY(dryRun); - return false; - } else if (!dryRun) { - OnChangesApplyFinished(); - Stage = EStage::Applied; - } - return true; } + bool ApplyChanges(TColumnEngineForLogs& self, TApplyChangesContext& context, const bool dryRun); + virtual ui32 GetWritePortionsCount() const = 0; virtual const TPortionInfo& GetWritePortionInfo(const ui32 index) const = 0; virtual bool NeedWritePortion(const ui32 index) const = 0; virtual void UpdateWritePortionInfo(const ui32 index, const TPortionInfo& info) = 0; - void WriteIndex(NColumnShard::TColumnShard& self, TWriteIndexContext& context) { - Y_VERIFY(Stage != EStage::Aborted); - if ((ui32)Stage >= (ui32)EStage::Written) { - return; - } - Y_VERIFY(Stage == EStage::Applied); - - DoWriteIndex(self, context); - Stage = EStage::Written; - } - void WriteIndexComplete(NColumnShard::TColumnShard& self, TWriteIndexCompleteContext& context) { - Y_VERIFY(Stage == EStage::Aborted || Stage == EStage::Written); - if (Stage == EStage::Aborted) { - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "WriteIndexComplete")("stage", Stage); - return; - } - if (Stage == EStage::Written) { - Stage = EStage::Finished; - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "WriteIndexComplete")("type", TypeString())("success", context.FinishedSuccessfully); - DoWriteIndexComplete(self, context); - DoOnFinish(self, context); - } - } + void WriteIndex(NColumnShard::TColumnShard& self, TWriteIndexContext& context); + void WriteIndexComplete(NColumnShard::TColumnShard& self, TWriteIndexCompleteContext& context); - void Compile(TFinalizationContext& context) noexcept { - Y_VERIFY(Stage != EStage::Aborted); - if ((ui32)Stage >= (ui32)EStage::Compiled) { - return; - } - Y_VERIFY(Stage == EStage::Started); - - DoCompile(context); - - Stage = EStage::Compiled; - } + void Compile(TFinalizationContext& context) noexcept; void SetBlobs(THashMap<TBlobRange, TString>&& blobs) { Y_VERIFY(!blobs.empty()); @@ -260,7 +257,6 @@ public: TSnapshot InitSnapshot = TSnapshot::Zero(); THashMap<TBlobRange, TString> Blobs; - bool NeedRepeat{false}; virtual THashMap<TUnifiedBlobId, std::vector<TBlobRange>> GetGroupedBlobRanges() const = 0; virtual TString TypeString() const = 0; diff --git a/ydb/core/tx/columnshard/engines/changes/cleanup.h b/ydb/core/tx/columnshard/engines/changes/cleanup.h index 70e349ba186..33c971b6903 100644 --- a/ydb/core/tx/columnshard/engines/changes/cleanup.h +++ b/ydb/core/tx/columnshard/engines/changes/cleanup.h @@ -15,8 +15,16 @@ protected: virtual void DoWriteIndex(NColumnShard::TColumnShard& self, TWriteIndexContext& context) override; virtual void DoCompile(TFinalizationContext& /*context*/) override { } + virtual NKikimr::TConclusion<std::vector<TString>> DoConstructBlobs(TConstructionContext& /*context*/) noexcept override { + return std::vector<TString>(); + } + virtual bool NeedConstruction() const override { + return false; + } public: std::vector<TPortionInfo> PortionsToDrop; + bool NeedRepeat = false; + virtual THashMap<TUnifiedBlobId, std::vector<TBlobRange>> GetGroupedBlobRanges() const override { return {}; } diff --git a/ydb/core/tx/columnshard/engines/changes/in_granule_compaction.cpp b/ydb/core/tx/columnshard/engines/changes/in_granule_compaction.cpp new file mode 100644 index 00000000000..d50e17de65a --- /dev/null +++ b/ydb/core/tx/columnshard/engines/changes/in_granule_compaction.cpp @@ -0,0 +1,68 @@ +#include "in_granule_compaction.h" + +namespace NKikimr::NOlap { + +std::pair<std::shared_ptr<arrow::RecordBatch>, TSnapshot> TInGranuleCompactColumnEngineChanges::CompactInOneGranule(ui64 granule, + const std::vector<TPortionInfo>& portions, + const THashMap<TBlobRange, TString>& blobs, TConstructionContext& context) const { + std::vector<std::shared_ptr<arrow::RecordBatch>> batches; + batches.reserve(portions.size()); + + auto resultSchema = context.SchemaVersions.GetLastSchema(); + + TSnapshot maxSnapshot = resultSchema->GetSnapshot(); + for (auto& portionInfo : portions) { + Y_VERIFY(!portionInfo.Empty()); + Y_VERIFY(portionInfo.Granule() == granule); + auto blobSchema = context.SchemaVersions.GetSchema(portionInfo.GetSnapshot()); + auto batch = portionInfo.AssembleInBatch(*blobSchema, *resultSchema, blobs); + batches.push_back(batch); + if (portionInfo.GetSnapshot() > maxSnapshot) { + maxSnapshot = portionInfo.GetSnapshot(); + } + } + + auto sortedBatch = NArrow::CombineSortedBatches(batches, resultSchema->GetIndexInfo().SortReplaceDescription()); + Y_VERIFY_DEBUG(NArrow::IsSortedAndUnique(sortedBatch, resultSchema->GetIndexInfo().GetReplaceKey())); + + return std::make_pair(sortedBatch, maxSnapshot); +} + +TConclusion<std::vector<TString>> TInGranuleCompactColumnEngineChanges::DoConstructBlobs(TConstructionContext& context) noexcept { + const ui64 pathId = SrcGranule->PathId; + std::vector<TString> blobs; + auto& switchedPortions = SwitchedPortions; + Y_VERIFY(switchedPortions.size()); + + ui64 granule = switchedPortions[0].Granule(); + auto [batch, maxSnapshot] = CompactInOneGranule(granule, switchedPortions, Blobs, context); + + auto resultSchema = context.SchemaVersions.GetLastSchema(); + std::vector<TPortionInfo> portions; + if (!MergeBorders.Empty()) { + Y_VERIFY(MergeBorders.GetOrderedMarks().size() > 1); + auto slices = MergeBorders.SliceIntoGranules(batch, resultSchema->GetIndexInfo()); + portions.reserve(slices.size()); + + for (auto& [_, slice] : slices) { + if (!slice || slice->num_rows() == 0) { + continue; + } + auto tmp = MakeAppendedPortions(pathId, slice, granule, maxSnapshot, blobs, GetGranuleMeta(), context); + for (auto&& portionInfo : tmp) { + portions.emplace_back(std::move(portionInfo)); + } + } + } else { + portions = MakeAppendedPortions(pathId, batch, granule, maxSnapshot, blobs, GetGranuleMeta(), context); + } + + Y_VERIFY(portions.size() > 0); + Y_VERIFY(AppendedPortions.empty()); + // Set appended portions. + AppendedPortions.swap(portions); + + return blobs; +} + +} diff --git a/ydb/core/tx/columnshard/engines/changes/in_granule_compaction.h b/ydb/core/tx/columnshard/engines/changes/in_granule_compaction.h new file mode 100644 index 00000000000..5a1cba91c9f --- /dev/null +++ b/ydb/core/tx/columnshard/engines/changes/in_granule_compaction.h @@ -0,0 +1,20 @@ +#pragma once +#include "compaction.h" + +namespace NKikimr::NOlap { + +class TGranuleMeta; + +class TInGranuleCompactColumnEngineChanges: public TCompactColumnEngineChanges { +private: + using TBase = TCompactColumnEngineChanges; + std::pair<std::shared_ptr<arrow::RecordBatch>, TSnapshot> CompactInOneGranule(ui64 granule, + const std::vector<TPortionInfo>& portions, const THashMap<TBlobRange, TString>& blobs, + TConstructionContext& context) const; +protected: + virtual TConclusion<std::vector<TString>> DoConstructBlobs(TConstructionContext& context) noexcept override; +public: + using TBase::TBase; +}; + +} diff --git a/ydb/core/tx/columnshard/engines/changes/indexation.cpp b/ydb/core/tx/columnshard/engines/changes/indexation.cpp index 2e45057cf8f..83b6f562196 100644 --- a/ydb/core/tx/columnshard/engines/changes/indexation.cpp +++ b/ydb/core/tx/columnshard/engines/changes/indexation.cpp @@ -69,4 +69,85 @@ void TInsertColumnEngineChanges::DoOnFinish(NColumnShard::TColumnShard& self, TC self.BackgroundController.FinishIndexing(); } +NKikimr::TConclusion<std::vector<TString>> TInsertColumnEngineChanges::DoConstructBlobs(TConstructionContext& context) noexcept { + Y_VERIFY(!DataToIndex.empty()); + Y_VERIFY(AppendedPortions.empty()); + + auto maxSnapshot = TSnapshot::Zero(); + for (auto& inserted : DataToIndex) { + TSnapshot insertSnap = inserted.GetSnapshot(); + Y_VERIFY(insertSnap.Valid()); + if (insertSnap > maxSnapshot) { + maxSnapshot = insertSnap; + } + } + Y_VERIFY(maxSnapshot.Valid()); + + auto resultSchema = context.SchemaVersions.GetSchema(maxSnapshot); + Y_VERIFY(resultSchema->GetIndexInfo().IsSorted()); + + THashMap<ui64, std::vector<std::shared_ptr<arrow::RecordBatch>>> pathBatches; + for (auto& inserted : DataToIndex) { + TBlobRange blobRange(inserted.BlobId, 0, inserted.BlobId.BlobSize()); + + auto blobSchema = context.SchemaVersions.GetSchema(inserted.GetSchemaSnapshot()); + auto& indexInfo = blobSchema->GetIndexInfo(); + Y_VERIFY(indexInfo.IsSorted()); + + std::shared_ptr<arrow::RecordBatch> batch; + if (auto it = CachedBlobs.find(inserted.BlobId); it != CachedBlobs.end()) { + batch = it->second; + } else if (auto* blobData = Blobs.FindPtr(blobRange)) { + Y_VERIFY(!blobData->empty(), "Blob data not present"); + // Prepare batch + batch = NArrow::DeserializeBatch(*blobData, indexInfo.ArrowSchema()); + if (!batch) { + AFL_ERROR(NKikimrServices::TX_COLUMNSHARD) + ("event", "cannot_parse") + ("data_snapshot", TStringBuilder() << inserted.GetSnapshot()) + ("index_snapshot", TStringBuilder() << blobSchema->GetSnapshot()); + } + } else { + Y_VERIFY(blobData, "Data for range %s has not been read", blobRange.ToString().c_str()); + } + Y_VERIFY(batch); + + batch = AddSpecials(batch, blobSchema->GetIndexInfo(), inserted); + batch = resultSchema->NormalizeBatch(*blobSchema, batch); + pathBatches[inserted.PathId].push_back(batch); + Y_VERIFY_DEBUG(NArrow::IsSorted(pathBatches[inserted.PathId].back(), resultSchema->GetIndexInfo().GetReplaceKey())); + } + std::vector<TString> blobs; + + for (auto& [pathId, batches] : pathBatches) { + AddPathIfNotExists(pathId); + + // We could merge data here cause tablet limits indexing data portions + auto merged = NArrow::CombineSortedBatches(batches, resultSchema->GetIndexInfo().SortReplaceDescription()); + Y_VERIFY(merged); + Y_VERIFY_DEBUG(NArrow::IsSortedAndUnique(merged, resultSchema->GetIndexInfo().GetReplaceKey())); + + auto granuleBatches = TMarksGranules::SliceIntoGranules(merged, PathToGranule[pathId], resultSchema->GetIndexInfo()); + for (auto& [granule, batch] : granuleBatches) { + auto portions = MakeAppendedPortions(pathId, batch, granule, maxSnapshot, blobs, GetGranuleMeta(), context); + Y_VERIFY(portions.size() > 0); + for (auto& portion : portions) { + AppendedPortions.emplace_back(std::move(portion)); + } + } + } + + Y_VERIFY(PathToGranule.size() == pathBatches.size()); + return blobs; +} + +std::shared_ptr<arrow::RecordBatch> TInsertColumnEngineChanges::AddSpecials(const std::shared_ptr<arrow::RecordBatch>& srcBatch, + const TIndexInfo& indexInfo, const TInsertedData& inserted) const +{ + auto batch = TIndexInfo::AddSpecialColumns(srcBatch, inserted.GetSnapshot()); + Y_VERIFY(batch); + + return NArrow::ExtractColumns(batch, indexInfo.ArrowSchemaWithSpecials()); +} + } diff --git a/ydb/core/tx/columnshard/engines/changes/indexation.h b/ydb/core/tx/columnshard/engines/changes/indexation.h index 852583c730d..77fc125dd08 100644 --- a/ydb/core/tx/columnshard/engines/changes/indexation.h +++ b/ydb/core/tx/columnshard/engines/changes/indexation.h @@ -9,12 +9,16 @@ namespace NKikimr::NOlap { class TInsertColumnEngineChanges: public TChangesWithAppend { private: using TBase = TChangesWithAppend; + std::shared_ptr<arrow::RecordBatch> AddSpecials(const std::shared_ptr<arrow::RecordBatch>& srcBatch, + const TIndexInfo& indexInfo, const TInsertedData& inserted) const; + protected: virtual void DoStart(NColumnShard::TColumnShard& self) override; virtual void DoWriteIndexComplete(NColumnShard::TColumnShard& self, TWriteIndexCompleteContext& context) override; virtual bool DoApplyChanges(TColumnEngineForLogs& self, TApplyChangesContext& context, const bool dryRun) override; virtual void DoOnFinish(NColumnShard::TColumnShard& self, TChangesFinishContext& context) override; virtual void DoWriteIndex(NColumnShard::TColumnShard& self, TWriteIndexContext& context) override; + virtual TConclusion<std::vector<TString>> DoConstructBlobs(TConstructionContext& context) noexcept override; public: const TMark DefaultMark; THashMap<ui64, std::vector<std::pair<TMark, ui64>>> PathToGranule; // pathId -> {mark, granule} diff --git a/ydb/core/tx/columnshard/engines/changes/mark_granules.cpp b/ydb/core/tx/columnshard/engines/changes/mark_granules.cpp index 5530e2bfa75..57ede5644a6 100644 --- a/ydb/core/tx/columnshard/engines/changes/mark_granules.cpp +++ b/ydb/core/tx/columnshard/engines/changes/mark_granules.cpp @@ -1,5 +1,4 @@ #include "mark_granules.h" -#include <ydb/core/tx/columnshard/engines/index_logic_logs.h> namespace NKikimr::NOlap { TMarksGranules::TMarksGranules(std::vector<TPair>&& marks) noexcept diff --git a/ydb/core/tx/columnshard/engines/changes/split_compaction.cpp b/ydb/core/tx/columnshard/engines/changes/split_compaction.cpp new file mode 100644 index 00000000000..cf19d14241b --- /dev/null +++ b/ydb/core/tx/columnshard/engines/changes/split_compaction.cpp @@ -0,0 +1,391 @@ +#include "split_compaction.h" + +namespace NKikimr::NOlap { + +TConclusion<std::vector<TString>> TSplitCompactColumnEngineChanges::DoConstructBlobs(TConstructionContext& context) noexcept { + const ui64 pathId = SrcGranule->PathId; + const TMark ts0 = SrcGranule->Mark; + std::vector<TPortionInfo>& portions = SwitchedPortions; + + std::vector<std::pair<TMark, ui64>> tsIds; + ui64 movedRows = TryMovePortions(ts0, portions, tsIds, PortionsToMove, context); + auto [srcBatches, maxSnapshot] = PortionsToBatches(portions, Blobs, movedRows != 0, context); + Y_VERIFY(srcBatches.size() == portions.size()); + + std::vector<TString> blobs; + auto resultSchema = context.SchemaVersions.GetLastSchema(); + if (movedRows) { + Y_VERIFY(PortionsToMove.size() >= 2); + Y_VERIFY(PortionsToMove.size() == tsIds.size()); + Y_VERIFY(tsIds.begin()->first == ts0); + + // Calculate total number of rows. + ui64 numRows = movedRows; + for (const auto& batch : srcBatches) { + numRows += batch->num_rows(); + } + + // Recalculate new granules borders (if they are larger then portions) + ui32 numSplitInto = NumSplitInto(numRows); + if (numSplitInto < tsIds.size()) { + const ui32 rowsInGranule = numRows / numSplitInto; + Y_VERIFY(rowsInGranule); + + std::vector<std::pair<TMark, ui64>> newTsIds; + ui32 tmpGranule = 0; + ui32 sumRows = 0; + // Always insert mark of the source granule at the beginning. + newTsIds.emplace_back(ts0, 1); + + for (size_t i = 0, end = tsIds.size(); i != end; ++i) { + const TMark& ts = tsIds[i].first; + // Make new granule if the current number of rows is exceeded the allowed number of rows in the granule + // or there is the end of the ids and nothing was inserted so far. + if (sumRows >= rowsInGranule || (i + 1 == end && newTsIds.size() == 1)) { + ++tmpGranule; + newTsIds.emplace_back(ts, tmpGranule + 1); + sumRows = 0; + } + + auto& toMove = PortionsToMove[i]; + sumRows += toMove.first.NumRows(); + toMove.second = tmpGranule; + } + + tsIds.swap(newTsIds); + } + Y_VERIFY(tsIds.size() > 1); + Y_VERIFY(tsIds[0] == std::make_pair(ts0, ui64(1))); + TMarksGranules marksGranules(std::move(tsIds)); + + // Slice inserted portions with granules' borders + THashMap<ui64, std::vector<std::shared_ptr<arrow::RecordBatch>>> idBatches; + std::vector<TPortionInfo*> toSwitch; + toSwitch.reserve(portions.size()); + for (size_t i = 0; i < portions.size(); ++i) { + auto& portion = portions[i]; + auto& batch = srcBatches[i]; + auto slices = marksGranules.SliceIntoGranules(batch, resultSchema->GetIndexInfo()); + + THashSet<ui64> ids; + for (auto& [id, slice] : slices) { + if (slice && slice->num_rows()) { + ids.insert(id); + idBatches[id].emplace_back(std::move(slice)); + } + } + + // Optimization: move not splitted inserted portions. Do not reappend them. + if (ids.size() == 1) { + ui64 id = *ids.begin(); + idBatches[id].resize(idBatches[id].size() - 1); + ui64 tmpGranule = id - 1; + PortionsToMove.emplace_back(std::move(portion), tmpGranule); + } else { + toSwitch.push_back(&portion); + } + } + + // Update switchedPortions if we have moves + if (toSwitch.size() != portions.size()) { + std::vector<TPortionInfo> tmp; + tmp.reserve(toSwitch.size()); + for (auto* portionInfo : toSwitch) { + tmp.emplace_back(std::move(*portionInfo)); + } + portions.swap(tmp); + } + + for (const auto& [mark, id] : marksGranules.GetOrderedMarks()) { + ui64 tmpGranule = SetTmpGranule(pathId, mark); + for (const auto& batch : idBatches[id]) { + // Cannot set snapshot here. It would be set in committing transaction in ApplyChanges(). + auto newPortions = MakeAppendedPortions(pathId, batch, tmpGranule, maxSnapshot, blobs, GetGranuleMeta(), context); + Y_VERIFY(newPortions.size() > 0); + for (auto& portion : newPortions) { + AppendedPortions.emplace_back(std::move(portion)); + } + } + } + } else { + auto batches = SliceGranuleBatches(resultSchema->GetIndexInfo(), srcBatches, ts0); + + SetTmpGranule(pathId, ts0); + for (auto& [ts, batch] : batches) { + // Tmp granule would be updated to correct value in ApplyChanges() + ui64 tmpGranule = SetTmpGranule(pathId, ts); + + // Cannot set snapshot here. It would be set in committing transaction in ApplyChanges(). + auto portions = MakeAppendedPortions(pathId, batch, tmpGranule, maxSnapshot, blobs, GetGranuleMeta(), context); + Y_VERIFY(portions.size() > 0); + for (auto& portion : portions) { + AppendedPortions.emplace_back(std::move(portion)); + } + } + } + + return blobs; +} + +std::pair<std::vector<std::shared_ptr<arrow::RecordBatch>>, NKikimr::NOlap::TSnapshot> TSplitCompactColumnEngineChanges::PortionsToBatches( + const std::vector<TPortionInfo>& portions, const THashMap<TBlobRange, TString>& blobs, const bool insertedOnly, TConstructionContext& context) { + std::vector<std::shared_ptr<arrow::RecordBatch>> batches; + batches.reserve(portions.size()); + const auto resultSchema = context.SchemaVersions.GetLastSchema(); + TSnapshot maxSnapshot = resultSchema->GetSnapshot(); + for (const auto& portionInfo : portions) { + if (!insertedOnly || portionInfo.IsInserted()) { + const auto blobSchema = context.SchemaVersions.GetSchema(portionInfo.GetSnapshot()); + + batches.push_back(portionInfo.AssembleInBatch(*blobSchema, *resultSchema, blobs)); + + if (maxSnapshot < portionInfo.GetSnapshot()) { + maxSnapshot = portionInfo.GetSnapshot(); + } + } + } + return std::make_pair(std::move(batches), maxSnapshot); +} + +std::vector<std::pair<NKikimr::NOlap::TMark, std::shared_ptr<arrow::RecordBatch>>> TSplitCompactColumnEngineChanges::SliceGranuleBatches(const TIndexInfo& indexInfo, + const std::vector<std::shared_ptr<arrow::RecordBatch>>& batches, const TMark& ts0) { + std::vector<std::pair<TMark, std::shared_ptr<arrow::RecordBatch>>> out; + + // Extract unique effective keys and their counts + i64 numRows = 0; + TMap<NArrow::TReplaceKey, ui32> uniqKeyCount; + for (const auto& batch : batches) { + Y_VERIFY(batch); + if (batch->num_rows() == 0) { + continue; + } + + numRows += batch->num_rows(); + + const auto effKey = TMarksGranules::GetEffectiveKey(batch, indexInfo); + Y_VERIFY(effKey->num_columns() && effKey->num_rows()); + + auto effColumns = std::make_shared<NArrow::TArrayVec>(effKey->columns()); + for (int row = 0; row < effKey->num_rows(); ++row) { + ++uniqKeyCount[NArrow::TReplaceKey(effColumns, row)]; + } + } + + Y_VERIFY(uniqKeyCount.size()); + auto minTs = uniqKeyCount.begin()->first; + auto maxTs = uniqKeyCount.rbegin()->first; + Y_VERIFY(minTs >= ts0.GetBorder()); + + // It's an estimation of needed count cause numRows calculated before key replaces + ui32 rowsInGranule = numRows / NumSplitInto(numRows); + Y_VERIFY(rowsInGranule); + + // Cannot split in case of one unique key + if (uniqKeyCount.size() == 1) { + // We have to split big batch of same key in several portions + auto merged = NArrow::MergeSortedBatches(batches, indexInfo.SortReplaceDescription(), rowsInGranule); + for (auto& batch : merged) { + Y_VERIFY_DEBUG(NArrow::IsSortedAndUnique(batch, indexInfo.GetReplaceKey())); + out.emplace_back(ts0, batch); + } + return out; + } + + // Make split borders from uniq keys + std::vector<NArrow::TReplaceKey> borders; + borders.reserve(numRows / rowsInGranule); + { + ui32 sumRows = 0; + for (auto& [ts, num] : uniqKeyCount) { + if (sumRows >= rowsInGranule) { + borders.emplace_back(ts); + sumRows = 0; + } + sumRows += num; + } + if (borders.empty()) { + borders.emplace_back(maxTs); // huge trailing key + } + Y_VERIFY(borders.size()); + } + + // Find offsets in source batches + std::vector<std::vector<int>> offsets(batches.size()); // vec[batch][border] = offset + for (size_t i = 0; i < batches.size(); ++i) { + const auto& batch = batches[i]; + auto& batchOffsets = offsets[i]; + batchOffsets.reserve(borders.size() + 1); + + const auto effKey = TMarksGranules::GetEffectiveKey(batch, indexInfo); + Y_VERIFY(effKey->num_columns() && effKey->num_rows()); + + std::vector<NArrow::TRawReplaceKey> keys; + { + const auto& columns = effKey->columns(); + keys.reserve(effKey->num_rows()); + for (i64 i = 0; i < effKey->num_rows(); ++i) { + keys.emplace_back(NArrow::TRawReplaceKey(&columns, i)); + } + } + + batchOffsets.push_back(0); + for (const auto& border : borders) { + int offset = NArrow::LowerBound(keys, border, batchOffsets.back()); + Y_VERIFY(offset >= batchOffsets.back()); + Y_VERIFY(offset <= batch->num_rows()); + batchOffsets.push_back(offset); + } + + Y_VERIFY(batchOffsets.size() == borders.size() + 1); + } + + // Make merge-sorted granule batch for each splitted granule + for (ui32 granuleNo = 0; granuleNo < borders.size() + 1; ++granuleNo) { + std::vector<std::shared_ptr<arrow::RecordBatch>> granuleBatches; + granuleBatches.reserve(batches.size()); + const bool lastGranule = (granuleNo == borders.size()); + + // Extract granule: slice source batches with offsets + i64 granuleNumRows = 0; + for (size_t i = 0; i < batches.size(); ++i) { + const auto& batch = batches[i]; + auto& batchOffsets = offsets[i]; + + int offset = batchOffsets[granuleNo]; + int end = lastGranule ? batch->num_rows() : batchOffsets[granuleNo + 1]; + int size = end - offset; + Y_VERIFY(size >= 0); + + if (size) { + Y_VERIFY(offset < batch->num_rows()); + auto slice = batch->Slice(offset, size); + Y_VERIFY(slice->num_rows()); + granuleNumRows += slice->num_rows(); +#if 1 // Check correctness + const auto effKey = TMarksGranules::GetEffectiveKey(slice, indexInfo); + Y_VERIFY(effKey->num_columns() && effKey->num_rows()); + + auto startKey = granuleNo ? borders[granuleNo - 1] : minTs; + Y_VERIFY(NArrow::TReplaceKey::FromBatch(effKey, 0) >= startKey); + + NArrow::TReplaceKey lastSliceKey = NArrow::TReplaceKey::FromBatch(effKey, effKey->num_rows() - 1); + if (granuleNo < borders.size() - 1) { + const auto& endKey = borders[granuleNo]; + Y_VERIFY(lastSliceKey < endKey); + } else { + Y_VERIFY(lastSliceKey <= maxTs); + } +#endif + Y_VERIFY_DEBUG(NArrow::IsSorted(slice, indexInfo.GetReplaceKey())); + granuleBatches.emplace_back(slice); + } + } + + // Merge slices. We have to split a big key batches in several ones here. + if (granuleNumRows > 4 * rowsInGranule) { + granuleNumRows = rowsInGranule; + } + auto merged = NArrow::MergeSortedBatches(granuleBatches, indexInfo.SortReplaceDescription(), granuleNumRows); + for (auto& batch : merged) { + Y_VERIFY_DEBUG(NArrow::IsSortedAndUnique(batch, indexInfo.GetReplaceKey())); + + auto startKey = ts0.GetBorder(); + if (granuleNo) { + startKey = borders[granuleNo - 1]; + } +#if 1 // Check correctness + const auto effKey = TMarksGranules::GetEffectiveKey(batch, indexInfo); + Y_VERIFY(effKey->num_columns() && effKey->num_rows()); + + Y_VERIFY(NArrow::TReplaceKey::FromBatch(effKey, 0) >= startKey); +#endif + out.emplace_back(TMark(startKey), batch); + } + } + + return out; +} + +ui64 TSplitCompactColumnEngineChanges::TryMovePortions(const TMark& ts0, std::vector<TPortionInfo>& portions, + std::vector<std::pair<TMark, ui64>>& tsIds, std::vector<std::pair<TPortionInfo, ui64>>& toMove, TConstructionContext& context) +{ + std::vector<TPortionInfo*> partitioned(portions.size()); + // Split portions by putting the inserted portions in the original order + // at the beginning of the buffer and the compacted portions at the end. + // The compacted portions will be put in the reversed order, but it will be sorted later. + const auto [inserted, compacted] = [&]() { + size_t l = 0; + size_t r = portions.size(); + + for (auto& portionInfo : portions) { + partitioned[(portionInfo.IsInserted() ? l++ : --r)] = &portionInfo; + } + + return std::make_tuple(std::span(partitioned.begin(), l), std::span(partitioned.begin() + l, partitioned.end())); + }(); + + context.Counters.AnalizeCompactedPortions->Add(compacted.size()); + context.Counters.AnalizeInsertedPortions->Add(inserted.size()); + for (auto&& i : portions) { + if (i.IsInserted()) { + context.Counters.AnalizeInsertedBytes->Add(i.BlobsBytes()); + } else { + context.Counters.AnalizeCompactedBytes->Add(i.BlobsBytes()); + } + } + + // Do nothing if there are less than two compacted portions. + if (compacted.size() < 2) { + return 0; + } + // Order compacted portions by primary key. + std::sort(compacted.begin(), compacted.end(), [](const TPortionInfo* a, const TPortionInfo* b) { + return a->IndexKeyStart() < b->IndexKeyStart(); + }); + for (auto&& i : inserted) { + context.Counters.RepackedInsertedPortionBytes->Add(i->BlobsBytes()); + } + context.Counters.RepackedInsertedPortions->Add(inserted.size()); + + // Check that there are no gaps between two adjacent portions in term of primary key range. + for (size_t i = 0; i < compacted.size() - 1; ++i) { + if (compacted[i]->IndexKeyEnd() >= compacted[i + 1]->IndexKeyStart()) { + for (auto&& c : compacted) { + context.Counters.SkipPortionBytesMoveThroughIntersection->Add(c->BlobsBytes()); + } + + context.Counters.SkipPortionsMoveThroughIntersection->Add(compacted.size()); + context.Counters.RepackedCompactedPortions->Add(compacted.size()); + return 0; + } + } + + toMove.reserve(compacted.size()); + ui64 numRows = 0; + ui32 counter = 0; + for (auto* portionInfo : compacted) { + ui32 rows = portionInfo->NumRows(); + Y_VERIFY(rows); + numRows += rows; + context.Counters.MovedPortionBytes->Add(portionInfo->BlobsBytes()); + tsIds.emplace_back((counter ? TMark(portionInfo->IndexKeyStart()) : ts0), counter + 1); + toMove.emplace_back(std::move(*portionInfo), counter); + ++counter; + // Ensure that std::move will take an effect. + static_assert(std::swappable<decltype(*portionInfo)>); + } + context.Counters.MovedPortions->Add(toMove.size()); + + std::vector<TPortionInfo> out; + out.reserve(inserted.size()); + for (auto* portionInfo : inserted) { + out.emplace_back(std::move(*portionInfo)); + // Ensure that std::move will take an effect. + static_assert(std::swappable<decltype(*portionInfo)>); + } + portions.swap(out); + + return numRows; +} + +} diff --git a/ydb/core/tx/columnshard/engines/changes/split_compaction.h b/ydb/core/tx/columnshard/engines/changes/split_compaction.h new file mode 100644 index 00000000000..aed71cf5630 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/changes/split_compaction.h @@ -0,0 +1,27 @@ +#pragma once +#include "compaction.h" + +namespace NKikimr::NOlap { + +class TSplitCompactColumnEngineChanges: public TCompactColumnEngineChanges { +private: + using TBase = TCompactColumnEngineChanges; + std::pair<std::vector<std::shared_ptr<arrow::RecordBatch>>, TSnapshot> PortionsToBatches(const std::vector<TPortionInfo>& portions, const THashMap<TBlobRange, TString>& blobs, + const bool insertedOnly, TConstructionContext& context); + + std::vector<std::pair<TMark, std::shared_ptr<arrow::RecordBatch>>> SliceGranuleBatches(const TIndexInfo& indexInfo, + const std::vector<std::shared_ptr<arrow::RecordBatch>>& batches, + const TMark& ts0); + + ui64 TryMovePortions(const TMark& ts0, + std::vector<TPortionInfo>& portions, + std::vector<std::pair<TMark, ui64>>& tsIds, + std::vector<std::pair<TPortionInfo, ui64>>& toMove, TConstructionContext& context); + +protected: + virtual TConclusion<std::vector<TString>> DoConstructBlobs(TConstructionContext& context) noexcept override; +public: + using TBase::TBase; +}; + +} diff --git a/ydb/core/tx/columnshard/engines/changes/ttl.cpp b/ydb/core/tx/columnshard/engines/changes/ttl.cpp index ed29f8091f6..dc5cc467c67 100644 --- a/ydb/core/tx/columnshard/engines/changes/ttl.cpp +++ b/ydb/core/tx/columnshard/engines/changes/ttl.cpp @@ -169,4 +169,78 @@ void TTTLColumnEngineChanges::DoOnFinish(NColumnShard::TColumnShard& self, TChan self.BackgroundController.FinishTtl(); } +bool TTTLColumnEngineChanges::UpdateEvictedPortion(TPortionInfo& portionInfo, TPortionEvictionFeatures& evictFeatures, + const THashMap<TBlobRange, TString>& srcBlobs, std::vector<TColumnRecord>& evictedRecords, std::vector<TString>& newBlobs, + TConstructionContext& context) const { + Y_VERIFY(portionInfo.TierName != evictFeatures.TargetTierName); + + auto* tiering = context.GetTieringMap().FindPtr(evictFeatures.PathId); + Y_VERIFY(tiering); + auto compression = tiering->GetCompression(evictFeatures.TargetTierName); + if (!compression) { + // Noting to recompress. We have no other kinds of evictions yet. + portionInfo.TierName = evictFeatures.TargetTierName; + evictFeatures.DataChanges = false; + return true; + } + + Y_VERIFY(!evictFeatures.NeedExport); + + TPortionInfo undo = portionInfo; + + auto blobSchema = context.SchemaVersions.GetSchema(undo.GetSnapshot()); + auto resultSchema = context.SchemaVersions.GetLastSchema(); + auto batch = portionInfo.AssembleInBatch(*blobSchema, *resultSchema, srcBlobs); + + size_t undoSize = newBlobs.size(); + TSaverContext saverContext; + saverContext.SetTierName(evictFeatures.TargetTierName).SetExternalCompression(compression); + for (auto& rec : portionInfo.Records) { + auto pos = resultSchema->GetFieldIndex(rec.ColumnId); + Y_VERIFY(pos >= 0); + auto field = resultSchema->GetFieldByIndex(pos); + auto columnSaver = resultSchema->GetColumnSaver(rec.ColumnId, saverContext); + + auto blob = TPortionInfo::SerializeColumn(batch->GetColumnByName(field->name()), field, columnSaver); + if (blob.size() >= TPortionInfo::BLOB_BYTES_LIMIT) { + portionInfo = undo; + newBlobs.resize(undoSize); + return false; + } + newBlobs.emplace_back(std::move(blob)); + rec.BlobRange = TBlobRange{}; + } + + for (auto& rec : undo.Records) { + evictedRecords.emplace_back(std::move(rec)); + } + + portionInfo.AddMetadata(*resultSchema, batch, evictFeatures.TargetTierName); + return true; +} + +NKikimr::TConclusion<std::vector<TString>> TTTLColumnEngineChanges::DoConstructBlobs(TConstructionContext& context) noexcept { + Y_VERIFY(!Blobs.empty()); // src data + Y_VERIFY(!PortionsToEvict.empty()); // src meta + Y_VERIFY(EvictedRecords.empty()); // dst meta + + std::vector<TString> newBlobs; + std::vector<std::pair<TPortionInfo, TPortionEvictionFeatures>> evicted; + evicted.reserve(PortionsToEvict.size()); + + for (auto& [portionInfo, evictFeatures] : PortionsToEvict) { + Y_VERIFY(!portionInfo.Empty()); + Y_VERIFY(portionInfo.IsActive()); + + if (UpdateEvictedPortion(portionInfo, evictFeatures, Blobs, + EvictedRecords, newBlobs, context)) { + Y_VERIFY(portionInfo.TierName == evictFeatures.TargetTierName); + evicted.emplace_back(std::move(portionInfo), evictFeatures); + } + } + + PortionsToEvict.swap(evicted); + return newBlobs; +} + } diff --git a/ydb/core/tx/columnshard/engines/changes/ttl.h b/ydb/core/tx/columnshard/engines/changes/ttl.h index ba740c08bf4..3562e9494ba 100644 --- a/ydb/core/tx/columnshard/engines/changes/ttl.h +++ b/ydb/core/tx/columnshard/engines/changes/ttl.h @@ -9,6 +9,10 @@ private: using TBase = TCleanupColumnEngineChanges; THashMap<TString, TPathIdBlobs> ExportTierBlobs; ui64 ExportNo = 0; + bool UpdateEvictedPortion(TPortionInfo& portionInfo, + TPortionEvictionFeatures& evictFeatures, const THashMap<TBlobRange, TString>& srcBlobs, + std::vector<TColumnRecord>& evictedRecords, std::vector<TString>& newBlobs, TConstructionContext& context) const; + protected: virtual void DoWriteIndexComplete(NColumnShard::TColumnShard& self, TWriteIndexCompleteContext& context) override; virtual void DoCompile(TFinalizationContext& context) override; @@ -17,6 +21,10 @@ protected: virtual bool DoApplyChanges(TColumnEngineForLogs& self, TApplyChangesContext& context, const bool dryRun) override; virtual void DoDebugString(TStringOutput& out) const override; virtual void DoWriteIndex(NColumnShard::TColumnShard& self, TWriteIndexContext& context) override; + virtual TConclusion<std::vector<TString>> DoConstructBlobs(TConstructionContext& context) noexcept override; + virtual bool NeedConstruction() const override { + return PortionsToEvict.size(); + } public: std::vector<TColumnRecord> EvictedRecords; std::vector<std::pair<TPortionInfo, TPortionEvictionFeatures>> PortionsToEvict; // {portion, TPortionEvictionFeatures} diff --git a/ydb/core/tx/columnshard/engines/changes/with_appended.cpp b/ydb/core/tx/columnshard/engines/changes/with_appended.cpp index 27e7738362c..ac01441408a 100644 --- a/ydb/core/tx/columnshard/engines/changes/with_appended.cpp +++ b/ydb/core/tx/columnshard/engines/changes/with_appended.cpp @@ -2,6 +2,7 @@ #include <ydb/core/tx/columnshard/blob_cache.h> #include <ydb/core/tx/columnshard/columnshard_impl.h> #include <ydb/core/tx/columnshard/engines/column_engine_logs.h> +#include <ydb/core/tx/columnshard/splitter/splitter.h> namespace NKikimr::NOlap { @@ -101,4 +102,47 @@ void TChangesWithAppend::DoCompile(TFinalizationContext& context) { } } +std::vector<NKikimr::NOlap::TPortionInfo> TChangesWithAppend::MakeAppendedPortions( + const ui64 pathId, const std::shared_ptr<arrow::RecordBatch> batch, const ui64 granule, const TSnapshot& snapshot, + std::vector<TString>& blobs, const TGranuleMeta* granuleMeta, TConstructionContext& context) const { + Y_VERIFY(batch->num_rows()); + + auto resultSchema = context.SchemaVersions.GetSchema(snapshot); + std::vector<TPortionInfo> out; + + TString tierName; + std::optional<NArrow::TCompression> compression; + if (pathId) { + if (auto* tiering = context.GetTieringMap().FindPtr(pathId)) { + tierName = tiering->GetHottestTierName(); + if (const auto& tierCompression = tiering->GetCompression(tierName)) { + compression = *tierCompression; + } + } + } + TSaverContext saverContext; + saverContext.SetTierName(tierName).SetExternalCompression(compression); + + TSplitLimiter limiter(granuleMeta, context.Counters, resultSchema, batch); + + std::vector<TString> portionBlobs; + std::shared_ptr<arrow::RecordBatch> portionBatch; + while (limiter.Next(portionBlobs, portionBatch, saverContext)) { + TPortionInfo portionInfo; + portionInfo.Records.reserve(resultSchema->GetSchema()->num_fields()); + for (auto&& f : resultSchema->GetSchema()->fields()) { + const ui32 columnId = resultSchema->GetIndexInfo().GetColumnId(f->name()); + TColumnRecord record = TColumnRecord::Make(granule, columnId, snapshot, 0); + portionInfo.AppendOneChunkColumn(std::move(record)); + } + for (auto&& i : portionBlobs) { + blobs.emplace_back(i); + } + portionInfo.AddMetadata(*resultSchema, portionBatch, tierName); + out.emplace_back(std::move(portionInfo)); + } + + return out; +} + } diff --git a/ydb/core/tx/columnshard/engines/changes/with_appended.h b/ydb/core/tx/columnshard/engines/changes/with_appended.h index 1a1b181a392..f258f9a834e 100644 --- a/ydb/core/tx/columnshard/engines/changes/with_appended.h +++ b/ydb/core/tx/columnshard/engines/changes/with_appended.h @@ -10,6 +10,12 @@ protected: virtual void DoCompile(TFinalizationContext& context) override; virtual bool DoApplyChanges(TColumnEngineForLogs& self, TApplyChangesContext& context, const bool dryRun) override; virtual void DoWriteIndex(NColumnShard::TColumnShard& self, TWriteIndexContext& context) override; + std::vector<TPortionInfo> MakeAppendedPortions(const ui64 pathId, + const std::shared_ptr<arrow::RecordBatch> batch, + const ui64 granule, + const TSnapshot& snapshot, + std::vector<TString>& blobs, const TGranuleMeta* granuleMeta, TConstructionContext& context) const; + public: std::vector<TPortionInfo> AppendedPortions; // New portions after indexing or compaction THashMap<ui64, std::pair<ui64, TMark>> NewGranules; // granule -> {pathId, key} diff --git a/ydb/core/tx/columnshard/engines/changes/ya.make b/ydb/core/tx/columnshard/engines/changes/ya.make index 462b1c11518..dced5b30854 100644 --- a/ydb/core/tx/columnshard/engines/changes/ya.make +++ b/ydb/core/tx/columnshard/engines/changes/ya.make @@ -4,6 +4,8 @@ SRCS( abstract.cpp mark.cpp compaction.cpp + in_granule_compaction.cpp + split_compaction.cpp ttl.cpp indexation.cpp cleanup.cpp @@ -16,6 +18,7 @@ PEERDIR( ydb/core/formats/arrow ydb/core/tx/columnshard/common ydb/core/tx/columnshard/engines/insert_table + ydb/core/tx/columnshard/splitter ydb/core/tablet_flat ydb/core/tx/tiering ydb/core/protos diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp index 5d2336daf95..9175a753612 100644 --- a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp +++ b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp @@ -1,12 +1,16 @@ #include "column_engine_logs.h" #include "filter.h" -#include "index_logic_logs.h" #include "indexed_read_data.h" #include <ydb/core/tx/columnshard/hooks/abstract/abstract.h> #include <ydb/core/formats/arrow/one_batch_input_stream.h> #include <ydb/core/formats/arrow/merging_sorted_input_stream.h> #include <ydb/library/conclusion/status.h> +#include "changes/indexation.h" +#include "changes/in_granule_compaction.h" +#include "changes/split_compaction.h" +#include "changes/cleanup.h" +#include "changes/ttl.h" #include <concepts> @@ -122,6 +126,35 @@ TConclusionStatus InitInGranuleMerge(const TMark& granuleMark, std::vector<TPort } // namespace +std::shared_ptr<NKikimr::NOlap::TCompactColumnEngineChanges> TColumnEngineForLogs::TChangesConstructor::BuildCompactionChanges(std::unique_ptr<TCompactionInfo>&& info, + const TCompactionLimits& limits, const TSnapshot& initSnapshot) { + std::shared_ptr<TCompactColumnEngineChanges> result; + if (info->InGranule()) { + result = std::make_shared<TInGranuleCompactColumnEngineChanges>(limits, std::move(info)); + } else { + result = std::make_shared<TSplitCompactColumnEngineChanges>(limits, std::move(info)); + } + result->InitSnapshot = initSnapshot; + return result; +} + +std::shared_ptr<NKikimr::NOlap::TCleanupColumnEngineChanges> TColumnEngineForLogs::TChangesConstructor::BuildCleanupChanges(const TSnapshot& initSnapshot) { + auto changes = std::make_shared<TCleanupColumnEngineChanges>(); + changes->InitSnapshot = initSnapshot; + return changes; +} + +std::shared_ptr<NKikimr::NOlap::TTTLColumnEngineChanges> TColumnEngineForLogs::TChangesConstructor::BuildTtlChanges() { + return std::make_shared<TTTLColumnEngineChanges>(); +} + +std::shared_ptr<NKikimr::NOlap::TInsertColumnEngineChanges> TColumnEngineForLogs::TChangesConstructor::BuildInsertChanges(const TMark& defaultMark, std::vector<NOlap::TInsertedData>&& blobsToIndex, const TSnapshot& initSnapshot) { + auto changes = std::make_shared<TInsertColumnEngineChanges>(defaultMark); + changes->DataToIndex = std::move(blobsToIndex); + changes->InitSnapshot = initSnapshot; + return changes; +} + TColumnEngineForLogs::TColumnEngineForLogs(ui64 tabletId, const TCompactionLimits& limits) : GranulesStorage(std::make_shared<TGranulesStorage>(SignalCounters, limits)) , TabletId(tabletId) diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.h b/ydb/core/tx/columnshard/engines/column_engine_logs.h index e1fa5a799bd..54192815f65 100644 --- a/ydb/core/tx/columnshard/engines/column_engine_logs.h +++ b/ydb/core/tx/columnshard/engines/column_engine_logs.h @@ -6,10 +6,6 @@ #include <ydb/core/tx/columnshard/counters/engine_logs.h> #include "storage/granule.h" #include "storage/storage.h" -#include "changes/indexation.h" -#include "changes/compaction.h" -#include "changes/cleanup.h" -#include "changes/ttl.h" namespace NKikimr::NArrow { struct TSortDescription; @@ -40,29 +36,14 @@ public: class TChangesConstructor : public TColumnEngineChanges { public: static std::shared_ptr<TInsertColumnEngineChanges> BuildInsertChanges(const TMark& defaultMark, - std::vector<NOlap::TInsertedData>&& blobsToIndex, const TSnapshot& initSnapshot) { - auto changes = std::make_shared<TInsertColumnEngineChanges>(defaultMark); - changes->DataToIndex = std::move(blobsToIndex); - changes->InitSnapshot = initSnapshot; - return changes; - } + std::vector<NOlap::TInsertedData>&& blobsToIndex, const TSnapshot& initSnapshot); static std::shared_ptr<TCompactColumnEngineChanges> BuildCompactionChanges(std::unique_ptr<TCompactionInfo>&& info, - const TCompactionLimits& limits, const TSnapshot& initSnapshot) { - auto changes = std::make_shared<TCompactColumnEngineChanges>(limits, std::move(info)); - changes->InitSnapshot = initSnapshot; - return changes; - } + const TCompactionLimits& limits, const TSnapshot& initSnapshot); - static std::shared_ptr<TCleanupColumnEngineChanges> BuildCleanupChanges(const TSnapshot& initSnapshot) { - auto changes = std::make_shared<TCleanupColumnEngineChanges>(); - changes->InitSnapshot = initSnapshot; - return changes; - } + static std::shared_ptr<TCleanupColumnEngineChanges> BuildCleanupChanges(const TSnapshot& initSnapshot); - static std::shared_ptr<TTTLColumnEngineChanges> BuildTtlChanges() { - return std::make_shared<TTTLColumnEngineChanges>(); - } + static std::shared_ptr<TTTLColumnEngineChanges> BuildTtlChanges(); }; enum ETableIdx { diff --git a/ydb/core/tx/columnshard/engines/index_logic_logs.cpp b/ydb/core/tx/columnshard/engines/index_logic_logs.cpp deleted file mode 100644 index ab3eb26d9b3..00000000000 --- a/ydb/core/tx/columnshard/engines/index_logic_logs.cpp +++ /dev/null @@ -1,803 +0,0 @@ -#include "index_logic_logs.h" - -#include <span> - -namespace NKikimr::NOlap { - -std::shared_ptr<arrow::RecordBatch> TIndexationLogic::AddSpecials(const std::shared_ptr<arrow::RecordBatch>& srcBatch, - const TIndexInfo& indexInfo, const TInsertedData& inserted) const { - auto batch = TIndexInfo::AddSpecialColumns(srcBatch, inserted.GetSnapshot()); - Y_VERIFY(batch); - - return NArrow::ExtractColumns(batch, indexInfo.ArrowSchemaWithSpecials()); -} - -bool TEvictionLogic::UpdateEvictedPortion(TPortionInfo& portionInfo, - TPortionEvictionFeatures& evictFeatures, const THashMap<TBlobRange, TString>& srcBlobs, - std::vector<TColumnRecord>& evictedRecords, std::vector<TString>& newBlobs) const { - Y_VERIFY(portionInfo.TierName != evictFeatures.TargetTierName); - - auto* tiering = GetTieringMap().FindPtr(evictFeatures.PathId); - Y_VERIFY(tiering); - auto compression = tiering->GetCompression(evictFeatures.TargetTierName); - if (!compression) { - // Noting to recompress. We have no other kinds of evictions yet. - portionInfo.TierName = evictFeatures.TargetTierName; - evictFeatures.DataChanges = false; - return true; - } - - Y_VERIFY(!evictFeatures.NeedExport); - - TPortionInfo undo = portionInfo; - - auto blobSchema = SchemaVersions.GetSchema(undo.GetSnapshot()); - auto resultSchema = SchemaVersions.GetLastSchema(); - auto batch = portionInfo.AssembleInBatch(*blobSchema, *resultSchema, srcBlobs); - - size_t undoSize = newBlobs.size(); - TSaverContext saverContext; - saverContext.SetTierName(evictFeatures.TargetTierName).SetExternalCompression(compression); - for (auto& rec : portionInfo.Records) { - auto pos = resultSchema->GetFieldIndex(rec.ColumnId); - Y_VERIFY(pos >= 0); - auto field = resultSchema->GetFieldByIndex(pos); - auto columnSaver = resultSchema->GetColumnSaver(rec.ColumnId, saverContext); - - auto blob = TPortionInfo::SerializeColumn(batch->GetColumnByName(field->name()), field, columnSaver); - if (blob.size() >= TPortionInfo::BLOB_BYTES_LIMIT) { - portionInfo = undo; - newBlobs.resize(undoSize); - return false; - } - newBlobs.emplace_back(std::move(blob)); - rec.BlobRange = TBlobRange{}; - } - - for (auto& rec : undo.Records) { - evictedRecords.emplace_back(std::move(rec)); - } - - portionInfo.AddMetadata(*resultSchema, batch, evictFeatures.TargetTierName); - return true; -} - -class TSplitLimiter { -private: - static const inline double ReduceCorrectionKff = 0.9; - static const inline double IncreaseCorrectionKff = 1.1; - static const inline ui64 ExpectedBlobSize = 6 * 1024 * 1024; - static const inline ui64 MinBlobSize = 1 * 1024 * 1024; - - const NColumnShard::TIndexationCounters Counters; - ui32 BaseStepRecordsCount = 0; - ui32 CurrentStepRecordsCount = 0; - std::shared_ptr<arrow::RecordBatch> Batch; - std::vector<TColumnSummary> SortedColumnIds; - ui32 Position = 0; - ISnapshotSchema::TPtr Schema; -public: - TSplitLimiter(const TGranuleMeta* granuleMeta, const NColumnShard::TIndexationCounters& counters, - ISnapshotSchema::TPtr schema, const std::shared_ptr<arrow::RecordBatch> batch) - : Counters(counters) - , Batch(batch) - , Schema(schema) - { - if (granuleMeta && granuleMeta->GetAdditiveSummary().GetOther().GetRecordsCount()) { - Y_VERIFY(granuleMeta->GetHardSummary().GetColumnIdsSortedBySizeDescending().size()); - SortedColumnIds = granuleMeta->GetHardSummary().GetColumnIdsSortedBySizeDescending(); - const auto biggestColumn = SortedColumnIds.front(); - Y_VERIFY(biggestColumn.GetPackedBlobsSize()); - const double expectedPackedRecordSize = 1.0 * biggestColumn.GetPackedBlobsSize() / granuleMeta->GetAdditiveSummary().GetOther().GetRecordsCount(); - BaseStepRecordsCount = ExpectedBlobSize / expectedPackedRecordSize; - for (ui32 i = 1; i < SortedColumnIds.size(); ++i) { - Y_VERIFY(SortedColumnIds[i - 1].GetPackedBlobsSize() >= SortedColumnIds[i].GetPackedBlobsSize()); - } - if (BaseStepRecordsCount > batch->num_rows()) { - BaseStepRecordsCount = batch->num_rows(); - } else { - BaseStepRecordsCount = batch->num_rows() / (ui32)(batch->num_rows() / BaseStepRecordsCount); - if (BaseStepRecordsCount * expectedPackedRecordSize > TCompactionLimits::MAX_BLOB_SIZE) { - BaseStepRecordsCount = ExpectedBlobSize / expectedPackedRecordSize; - } - } - } else { - for (auto&& i : Schema->GetIndexInfo().GetColumnIds()) { - SortedColumnIds.emplace_back(TColumnSummary(i)); - } - BaseStepRecordsCount = batch->num_rows(); - } - BaseStepRecordsCount = std::min<ui32>(BaseStepRecordsCount, Batch->num_rows()); - Y_VERIFY(BaseStepRecordsCount); - CurrentStepRecordsCount = BaseStepRecordsCount; - } - - bool Next(std::vector<TString>& portionBlobs, std::shared_ptr<arrow::RecordBatch>& batch, const TSaverContext& saverContext) { - if (Position == Batch->num_rows()) { - return false; - } - - portionBlobs.resize(Schema->GetSchema()->num_fields()); - while (true) { - Y_VERIFY(Position < Batch->num_rows()); - std::shared_ptr<arrow::RecordBatch> currentBatch; - if (Batch->num_rows() - Position < CurrentStepRecordsCount * 1.1) { - currentBatch = Batch->Slice(Position, Batch->num_rows() - Position); - } else { - currentBatch = Batch->Slice(Position, CurrentStepRecordsCount); - } - - ui32 fillCounter = 0; - for (const auto& columnSummary : SortedColumnIds) { - const TString& columnName = Schema->GetIndexInfo().GetColumnName(columnSummary.GetColumnId()); - const int idx = Schema->GetFieldIndex(columnSummary.GetColumnId()); - Y_VERIFY(idx >= 0); - auto field = Schema->GetFieldByIndex(idx); - Y_VERIFY(field); - auto array = currentBatch->GetColumnByName(columnName); - Y_VERIFY(array); - auto columnSaver = Schema->GetColumnSaver(columnSummary.GetColumnId(), saverContext); - TString blob = TPortionInfo::SerializeColumn(array, field, columnSaver); - if (blob.size() >= TCompactionLimits::MAX_BLOB_SIZE) { - Counters.TrashDataSerializationBytes->Add(blob.size()); - Counters.TrashDataSerialization->Add(1); - Counters.TrashDataSerializationHistogramBytes->Collect(blob.size()); - const double kffNew = 1.0 * ExpectedBlobSize / blob.size() * ReduceCorrectionKff; - CurrentStepRecordsCount = currentBatch->num_rows() * kffNew; - Y_VERIFY(CurrentStepRecordsCount); - break; - } else { - Counters.CorrectDataSerializationBytes->Add(blob.size()); - Counters.CorrectDataSerialization->Add(1); - } - - portionBlobs[idx] = std::move(blob); - ++fillCounter; - } - - if (fillCounter == portionBlobs.size()) { - Y_VERIFY(fillCounter == portionBlobs.size()); - Position += currentBatch->num_rows(); - Y_VERIFY(Position <= Batch->num_rows()); - ui64 maxBlobSize = 0; - for (auto&& i : portionBlobs) { - Counters.SplittedPortionColumnSize->Collect(i.size()); - maxBlobSize = std::max<ui64>(maxBlobSize, i.size()); - } - batch = currentBatch; - if (maxBlobSize < MinBlobSize) { - if ((Position != currentBatch->num_rows() || Position != Batch->num_rows())) { - Counters.SplittedPortionLargestColumnSize->Collect(maxBlobSize); - Counters.TooSmallBlob->Add(1); - if (Position == Batch->num_rows()) { - Counters.TooSmallBlobFinish->Add(1); - } - if (Position == currentBatch->num_rows()) { - Counters.TooSmallBlobStart->Add(1); - } - } else { - Counters.SimpleSplitPortionLargestColumnSize->Collect(maxBlobSize); - } - CurrentStepRecordsCount = currentBatch->num_rows() * IncreaseCorrectionKff; - } else { - Counters.SplittedPortionLargestColumnSize->Collect(maxBlobSize); - } - return true; - } - } - } -}; - -std::vector<TPortionInfo> TIndexLogicBase::MakeAppendedPortions(const ui64 pathId, - const std::shared_ptr<arrow::RecordBatch> batch, - const ui64 granule, - const TSnapshot& snapshot, - std::vector<TString>& blobs, const TGranuleMeta* granuleMeta) const { - Y_VERIFY(batch->num_rows()); - - auto resultSchema = SchemaVersions.GetSchema(snapshot); - std::vector<TPortionInfo> out; - - TString tierName; - std::optional<NArrow::TCompression> compression; - if (pathId) { - if (auto* tiering = GetTieringMap().FindPtr(pathId)) { - tierName = tiering->GetHottestTierName(); - if (const auto& tierCompression = tiering->GetCompression(tierName)) { - compression = *tierCompression; - } - } - } - TSaverContext saverContext; - saverContext.SetTierName(tierName).SetExternalCompression(compression); - - TSplitLimiter limiter(granuleMeta, Counters, resultSchema, batch); - - std::vector<TString> portionBlobs; - std::shared_ptr<arrow::RecordBatch> portionBatch; - while (limiter.Next(portionBlobs, portionBatch, saverContext)) { - TPortionInfo portionInfo; - portionInfo.Records.reserve(resultSchema->GetSchema()->num_fields()); - for (auto&& f : resultSchema->GetSchema()->fields()) { - const ui32 columnId = resultSchema->GetIndexInfo().GetColumnId(f->name()); - TColumnRecord record = TColumnRecord::Make(granule, columnId, snapshot, 0); - portionInfo.AppendOneChunkColumn(std::move(record)); - } - for (auto&& i : portionBlobs) { - blobs.emplace_back(i); - } - portionInfo.AddMetadata(*resultSchema, portionBatch, tierName); - out.emplace_back(std::move(portionInfo)); - } - - return out; -} - -std::pair<std::vector<std::shared_ptr<arrow::RecordBatch>>, TSnapshot> -TCompactionLogic::PortionsToBatches(const std::vector<TPortionInfo>& portions, const THashMap<TBlobRange, TString>& blobs, - const bool insertedOnly) const { - std::vector<std::shared_ptr<arrow::RecordBatch>> batches; - batches.reserve(portions.size()); - const auto resultSchema = SchemaVersions.GetLastSchema(); - TSnapshot maxSnapshot = resultSchema->GetSnapshot(); - for (const auto& portionInfo : portions) { - if (!insertedOnly || portionInfo.IsInserted()) { - const auto blobSchema = SchemaVersions.GetSchema(portionInfo.GetSnapshot()); - - batches.push_back(portionInfo.AssembleInBatch(*blobSchema, *resultSchema, blobs)); - - if (maxSnapshot < portionInfo.GetSnapshot()) { - maxSnapshot = portionInfo.GetSnapshot(); - } - } - } - return std::make_pair(std::move(batches), maxSnapshot); -} - -TConclusion<std::vector<TString>> TIndexationLogic::DoApply(std::shared_ptr<TColumnEngineChanges> indexChanges) const noexcept { - auto changes = std::dynamic_pointer_cast<TInsertColumnEngineChanges>(indexChanges); - Y_VERIFY(changes); - Y_VERIFY(!changes->DataToIndex.empty()); - Y_VERIFY(changes->AppendedPortions.empty()); - - auto maxSnapshot = TSnapshot::Zero(); - for (auto& inserted : changes->DataToIndex) { - TSnapshot insertSnap = inserted.GetSnapshot(); - Y_VERIFY(insertSnap.Valid()); - if (insertSnap > maxSnapshot) { - maxSnapshot = insertSnap; - } - } - Y_VERIFY(maxSnapshot.Valid()); - - auto resultSchema = SchemaVersions.GetSchema(maxSnapshot); - Y_VERIFY(resultSchema->GetIndexInfo().IsSorted()); - - THashMap<ui64, std::vector<std::shared_ptr<arrow::RecordBatch>>> pathBatches; - for (auto& inserted : changes->DataToIndex) { - TBlobRange blobRange(inserted.BlobId, 0, inserted.BlobId.BlobSize()); - - auto blobSchema = SchemaVersions.GetSchema(inserted.GetSchemaSnapshot()); - auto& indexInfo = blobSchema->GetIndexInfo(); - Y_VERIFY(indexInfo.IsSorted()); - - std::shared_ptr<arrow::RecordBatch> batch; - if (auto it = changes->CachedBlobs.find(inserted.BlobId); it != changes->CachedBlobs.end()) { - batch = it->second; - } else if (auto* blobData = changes->Blobs.FindPtr(blobRange)) { - Y_VERIFY(!blobData->empty(), "Blob data not present"); - // Prepare batch - batch = NArrow::DeserializeBatch(*blobData, indexInfo.ArrowSchema()); - if (!batch) { - AFL_ERROR(NKikimrServices::TX_COLUMNSHARD) - ("event", "cannot_parse") - ("data_snapshot", TStringBuilder() << inserted.GetSnapshot()) - ("index_snapshot", TStringBuilder() << blobSchema->GetSnapshot()); - } - } else { - Y_VERIFY(blobData, "Data for range %s has not been read", blobRange.ToString().c_str()); - } - Y_VERIFY(batch); - - batch = AddSpecials(batch, blobSchema->GetIndexInfo(), inserted); - batch = resultSchema->NormalizeBatch(*blobSchema, batch); - pathBatches[inserted.PathId].push_back(batch); - Y_VERIFY_DEBUG(NArrow::IsSorted(pathBatches[inserted.PathId].back(), resultSchema->GetIndexInfo().GetReplaceKey())); - } - std::vector<TString> blobs; - - for (auto& [pathId, batches] : pathBatches) { - changes->AddPathIfNotExists(pathId); - - // We could merge data here cause tablet limits indexing data portions - auto merged = NArrow::CombineSortedBatches(batches, resultSchema->GetIndexInfo().SortReplaceDescription()); - Y_VERIFY(merged); - Y_VERIFY_DEBUG(NArrow::IsSortedAndUnique(merged, resultSchema->GetIndexInfo().GetReplaceKey())); - - auto granuleBatches = TMarksGranules::SliceIntoGranules(merged, changes->PathToGranule[pathId], resultSchema->GetIndexInfo()); - for (auto& [granule, batch] : granuleBatches) { - auto portions = MakeAppendedPortions(pathId, batch, granule, maxSnapshot, blobs, changes->GetGranuleMeta()); - Y_VERIFY(portions.size() > 0); - for (auto& portion : portions) { - changes->AppendedPortions.emplace_back(std::move(portion)); - } - } - } - - Y_VERIFY(changes->PathToGranule.size() == pathBatches.size()); - return blobs; -} - -std::pair<std::shared_ptr<arrow::RecordBatch>, TSnapshot> TCompactionLogic::CompactInOneGranule(ui64 granule, - const std::vector<TPortionInfo>& portions, - const THashMap<TBlobRange, TString>& blobs) const { - std::vector<std::shared_ptr<arrow::RecordBatch>> batches; - batches.reserve(portions.size()); - - auto resultSchema = SchemaVersions.GetLastSchema(); - - TSnapshot maxSnapshot = resultSchema->GetSnapshot(); - for (auto& portionInfo : portions) { - Y_VERIFY(!portionInfo.Empty()); - Y_VERIFY(portionInfo.Granule() == granule); - auto blobSchema = SchemaVersions.GetSchema(portionInfo.GetSnapshot()); - auto batch = portionInfo.AssembleInBatch(*blobSchema, *resultSchema, blobs); - batches.push_back(batch); - if (portionInfo.GetSnapshot() > maxSnapshot) { - maxSnapshot = portionInfo.GetSnapshot(); - } - } - - auto sortedBatch = NArrow::CombineSortedBatches(batches, resultSchema->GetIndexInfo().SortReplaceDescription()); - Y_VERIFY_DEBUG(NArrow::IsSortedAndUnique(sortedBatch, resultSchema->GetIndexInfo().GetReplaceKey())); - - return std::make_pair(sortedBatch, maxSnapshot); -} - -std::vector<TString> TCompactionLogic::CompactInGranule(std::shared_ptr<TCompactColumnEngineChanges> changes) const { - const ui64 pathId = changes->SrcGranule->PathId; - std::vector<TString> blobs; - auto& switchedPortions = changes->SwitchedPortions; - Y_VERIFY(switchedPortions.size()); - - ui64 granule = switchedPortions[0].Granule(); - auto [batch, maxSnapshot] = CompactInOneGranule(granule, switchedPortions, changes->Blobs); - - auto resultSchema = SchemaVersions.GetLastSchema(); - std::vector<TPortionInfo> portions; - if (!changes->MergeBorders.Empty()) { - Y_VERIFY(changes->MergeBorders.GetOrderedMarks().size() > 1); - auto slices = changes->MergeBorders.SliceIntoGranules(batch, resultSchema->GetIndexInfo()); - portions.reserve(slices.size()); - - for (auto& [_, slice] : slices) { - if (!slice || slice->num_rows() == 0) { - continue; - } - auto tmp = MakeAppendedPortions(pathId, slice, granule, maxSnapshot, blobs, changes->GetGranuleMeta()); - for (auto&& portionInfo : tmp) { - portions.emplace_back(std::move(portionInfo)); - } - } - } else { - portions = MakeAppendedPortions(pathId, batch, granule, maxSnapshot, blobs, changes->GetGranuleMeta()); - } - - Y_VERIFY(portions.size() > 0); - Y_VERIFY(changes->AppendedPortions.empty()); - // Set appended portions. - changes->AppendedPortions.swap(portions); - - return blobs; -} - -std::vector<std::pair<TMark, std::shared_ptr<arrow::RecordBatch>>> -TCompactionLogic::SliceGranuleBatches(const TIndexInfo& indexInfo, - const TCompactColumnEngineChanges& changes, - const std::vector<std::shared_ptr<arrow::RecordBatch>>& batches, - const TMark& ts0) const { - std::vector<std::pair<TMark, std::shared_ptr<arrow::RecordBatch>>> out; - - // Extract unique effective keys and their counts - i64 numRows = 0; - TMap<NArrow::TReplaceKey, ui32> uniqKeyCount; - for (const auto& batch : batches) { - Y_VERIFY(batch); - if (batch->num_rows() == 0) { - continue; - } - - numRows += batch->num_rows(); - - const auto effKey = TMarksGranules::GetEffectiveKey(batch, indexInfo); - Y_VERIFY(effKey->num_columns() && effKey->num_rows()); - - auto effColumns = std::make_shared<NArrow::TArrayVec>(effKey->columns()); - for (int row = 0; row < effKey->num_rows(); ++row) { - ++uniqKeyCount[NArrow::TReplaceKey(effColumns, row)]; - } - } - - Y_VERIFY(uniqKeyCount.size()); - auto minTs = uniqKeyCount.begin()->first; - auto maxTs = uniqKeyCount.rbegin()->first; - Y_VERIFY(minTs >= ts0.GetBorder()); - - // It's an estimation of needed count cause numRows calculated before key replaces - ui32 rowsInGranule = numRows / changes.NumSplitInto(numRows); - Y_VERIFY(rowsInGranule); - - // Cannot split in case of one unique key - if (uniqKeyCount.size() == 1) { - // We have to split big batch of same key in several portions - auto merged = NArrow::MergeSortedBatches(batches, indexInfo.SortReplaceDescription(), rowsInGranule); - for (auto& batch : merged) { - Y_VERIFY_DEBUG(NArrow::IsSortedAndUnique(batch, indexInfo.GetReplaceKey())); - out.emplace_back(ts0, batch); - } - return out; - } - - // Make split borders from uniq keys - std::vector<NArrow::TReplaceKey> borders; - borders.reserve(numRows / rowsInGranule); - { - ui32 sumRows = 0; - for (auto& [ts, num] : uniqKeyCount) { - if (sumRows >= rowsInGranule) { - borders.emplace_back(ts); - sumRows = 0; - } - sumRows += num; - } - if (borders.empty()) { - borders.emplace_back(maxTs); // huge trailing key - } - Y_VERIFY(borders.size()); - } - - // Find offsets in source batches - std::vector<std::vector<int>> offsets(batches.size()); // vec[batch][border] = offset - for (size_t i = 0; i < batches.size(); ++i) { - const auto& batch = batches[i]; - auto& batchOffsets = offsets[i]; - batchOffsets.reserve(borders.size() + 1); - - const auto effKey = TMarksGranules::GetEffectiveKey(batch, indexInfo); - Y_VERIFY(effKey->num_columns() && effKey->num_rows()); - - std::vector<NArrow::TRawReplaceKey> keys; - { - const auto& columns = effKey->columns(); - keys.reserve(effKey->num_rows()); - for (i64 i = 0; i < effKey->num_rows(); ++i) { - keys.emplace_back(NArrow::TRawReplaceKey(&columns, i)); - } - } - - batchOffsets.push_back(0); - for (const auto& border : borders) { - int offset = NArrow::LowerBound(keys, border, batchOffsets.back()); - Y_VERIFY(offset >= batchOffsets.back()); - Y_VERIFY(offset <= batch->num_rows()); - batchOffsets.push_back(offset); - } - - Y_VERIFY(batchOffsets.size() == borders.size() + 1); - } - - // Make merge-sorted granule batch for each splitted granule - for (ui32 granuleNo = 0; granuleNo < borders.size() + 1; ++granuleNo) { - std::vector<std::shared_ptr<arrow::RecordBatch>> granuleBatches; - granuleBatches.reserve(batches.size()); - const bool lastGranule = (granuleNo == borders.size()); - - // Extract granule: slice source batches with offsets - i64 granuleNumRows = 0; - for (size_t i = 0; i < batches.size(); ++i) { - const auto& batch = batches[i]; - auto& batchOffsets = offsets[i]; - - int offset = batchOffsets[granuleNo]; - int end = lastGranule ? batch->num_rows() : batchOffsets[granuleNo + 1]; - int size = end - offset; - Y_VERIFY(size >= 0); - - if (size) { - Y_VERIFY(offset < batch->num_rows()); - auto slice = batch->Slice(offset, size); - Y_VERIFY(slice->num_rows()); - granuleNumRows += slice->num_rows(); -#if 1 // Check correctness - const auto effKey = TMarksGranules::GetEffectiveKey(slice, indexInfo); - Y_VERIFY(effKey->num_columns() && effKey->num_rows()); - - auto startKey = granuleNo ? borders[granuleNo - 1] : minTs; - Y_VERIFY(NArrow::TReplaceKey::FromBatch(effKey, 0) >= startKey); - - NArrow::TReplaceKey lastSliceKey = NArrow::TReplaceKey::FromBatch(effKey, effKey->num_rows() - 1); - if (granuleNo < borders.size() - 1) { - const auto& endKey = borders[granuleNo]; - Y_VERIFY(lastSliceKey < endKey); - } else { - Y_VERIFY(lastSliceKey <= maxTs); - } -#endif - Y_VERIFY_DEBUG(NArrow::IsSorted(slice, indexInfo.GetReplaceKey())); - granuleBatches.emplace_back(slice); - } - } - - // Merge slices. We have to split a big key batches in several ones here. - if (granuleNumRows > 4 * rowsInGranule) { - granuleNumRows = rowsInGranule; - } - auto merged = NArrow::MergeSortedBatches(granuleBatches, indexInfo.SortReplaceDescription(), granuleNumRows); - for (auto& batch : merged) { - Y_VERIFY_DEBUG(NArrow::IsSortedAndUnique(batch, indexInfo.GetReplaceKey())); - - auto startKey = ts0.GetBorder(); - if (granuleNo) { - startKey = borders[granuleNo - 1]; - } -#if 1 // Check correctness - const auto effKey = TMarksGranules::GetEffectiveKey(batch, indexInfo); - Y_VERIFY(effKey->num_columns() && effKey->num_rows()); - - Y_VERIFY(NArrow::TReplaceKey::FromBatch(effKey, 0) >= startKey); -#endif - out.emplace_back(TMark(startKey), batch); - } - } - - return out; -} - -ui64 TCompactionLogic::TryMovePortions(const TMark& ts0, - std::vector<TPortionInfo>& portions, - std::vector<std::pair<TMark, ui64>>& tsIds, - std::vector<std::pair<TPortionInfo, ui64>>& toMove) const { - std::vector<TPortionInfo*> partitioned(portions.size()); - // Split portions by putting the inserted portions in the original order - // at the beginning of the buffer and the compacted portions at the end. - // The compacted portions will be put in the reversed order, but it will be sorted later. - const auto [inserted, compacted] = [&]() { - size_t l = 0; - size_t r = portions.size(); - - for (auto& portionInfo : portions) { - partitioned[(portionInfo.IsInserted() ? l++ : --r)] = &portionInfo; - } - - return std::make_tuple(std::span(partitioned.begin(), l), std::span(partitioned.begin() + l, partitioned.end())); - }(); - - Counters.AnalizeCompactedPortions->Add(compacted.size()); - Counters.AnalizeInsertedPortions->Add(inserted.size()); - for (auto&& i : portions) { - if (i.IsInserted()) { - Counters.AnalizeInsertedBytes->Add(i.BlobsBytes()); - } else { - Counters.AnalizeCompactedBytes->Add(i.BlobsBytes()); - } - } - - // Do nothing if there are less than two compacted portions. - if (compacted.size() < 2) { - return 0; - } - // Order compacted portions by primary key. - std::sort(compacted.begin(), compacted.end(), [](const TPortionInfo* a, const TPortionInfo* b) { - return a->IndexKeyStart() < b->IndexKeyStart(); - }); - for (auto&& i : inserted) { - Counters.RepackedInsertedPortionBytes->Add(i->BlobsBytes()); - } - Counters.RepackedInsertedPortions->Add(inserted.size()); - - // Check that there are no gaps between two adjacent portions in term of primary key range. - for (size_t i = 0; i < compacted.size() - 1; ++i) { - if (compacted[i]->IndexKeyEnd() >= compacted[i + 1]->IndexKeyStart()) { - for (auto&& c : compacted) { - Counters.SkipPortionBytesMoveThroughIntersection->Add(c->BlobsBytes()); - } - - Counters.SkipPortionsMoveThroughIntersection->Add(compacted.size()); - Counters.RepackedCompactedPortions->Add(compacted.size()); - return 0; - } - } - - toMove.reserve(compacted.size()); - ui64 numRows = 0; - ui32 counter = 0; - for (auto* portionInfo : compacted) { - ui32 rows = portionInfo->NumRows(); - Y_VERIFY(rows); - numRows += rows; - Counters.MovedPortionBytes->Add(portionInfo->BlobsBytes()); - tsIds.emplace_back((counter ? TMark(portionInfo->IndexKeyStart()) : ts0), counter + 1); - toMove.emplace_back(std::move(*portionInfo), counter); - ++counter; - // Ensure that std::move will take an effect. - static_assert(std::swappable<decltype(*portionInfo)>); - } - Counters.MovedPortions->Add(toMove.size()); - - std::vector<TPortionInfo> out; - out.reserve(inserted.size()); - for (auto* portionInfo : inserted) { - out.emplace_back(std::move(*portionInfo)); - // Ensure that std::move will take an effect. - static_assert(std::swappable<decltype(*portionInfo)>); - } - portions.swap(out); - - return numRows; -} - -std::vector<TString> TCompactionLogic::CompactSplitGranule(const std::shared_ptr<TCompactColumnEngineChanges>& changes) const { - const ui64 pathId = changes->SrcGranule->PathId; - const TMark ts0 = changes->SrcGranule->Mark; - std::vector<TPortionInfo>& portions = changes->SwitchedPortions; - - std::vector<std::pair<TMark, ui64>> tsIds; - ui64 movedRows = TryMovePortions(ts0, portions, tsIds, changes->PortionsToMove); - auto [srcBatches, maxSnapshot] = PortionsToBatches(portions, changes->Blobs, movedRows != 0); - Y_VERIFY(srcBatches.size() == portions.size()); - - std::vector<TString> blobs; - auto resultSchema = SchemaVersions.GetLastSchema(); - if (movedRows) { - Y_VERIFY(changes->PortionsToMove.size() >= 2); - Y_VERIFY(changes->PortionsToMove.size() == tsIds.size()); - Y_VERIFY(tsIds.begin()->first == ts0); - - // Calculate total number of rows. - ui64 numRows = movedRows; - for (const auto& batch : srcBatches) { - numRows += batch->num_rows(); - } - - // Recalculate new granules borders (if they are larger then portions) - ui32 numSplitInto = changes->NumSplitInto(numRows); - if (numSplitInto < tsIds.size()) { - const ui32 rowsInGranule = numRows / numSplitInto; - Y_VERIFY(rowsInGranule); - - std::vector<std::pair<TMark, ui64>> newTsIds; - ui32 tmpGranule = 0; - ui32 sumRows = 0; - // Always insert mark of the source granule at the beginning. - newTsIds.emplace_back(ts0, 1); - - for (size_t i = 0, end = tsIds.size(); i != end; ++i) { - const TMark& ts = tsIds[i].first; - // Make new granule if the current number of rows is exceeded the allowed number of rows in the granule - // or there is the end of the ids and nothing was inserted so far. - if (sumRows >= rowsInGranule || (i + 1 == end && newTsIds.size() == 1)) { - ++tmpGranule; - newTsIds.emplace_back(ts, tmpGranule + 1); - sumRows = 0; - } - - auto& toMove = changes->PortionsToMove[i]; - sumRows += toMove.first.NumRows(); - toMove.second = tmpGranule; - } - - tsIds.swap(newTsIds); - } - Y_VERIFY(tsIds.size() > 1); - Y_VERIFY(tsIds[0] == std::make_pair(ts0, ui64(1))); - TMarksGranules marksGranules(std::move(tsIds)); - - // Slice inserted portions with granules' borders - THashMap<ui64, std::vector<std::shared_ptr<arrow::RecordBatch>>> idBatches; - std::vector<TPortionInfo*> toSwitch; - toSwitch.reserve(portions.size()); - for (size_t i = 0; i < portions.size(); ++i) { - auto& portion = portions[i]; - auto& batch = srcBatches[i]; - auto slices = marksGranules.SliceIntoGranules(batch, resultSchema->GetIndexInfo()); - - THashSet<ui64> ids; - for (auto& [id, slice] : slices) { - if (slice && slice->num_rows()) { - ids.insert(id); - idBatches[id].emplace_back(std::move(slice)); - } - } - - // Optimization: move not splitted inserted portions. Do not reappend them. - if (ids.size() == 1) { - ui64 id = *ids.begin(); - idBatches[id].resize(idBatches[id].size() - 1); - ui64 tmpGranule = id - 1; - changes->PortionsToMove.emplace_back(std::move(portion), tmpGranule); - } else { - toSwitch.push_back(&portion); - } - } - - // Update switchedPortions if we have moves - if (toSwitch.size() != portions.size()) { - std::vector<TPortionInfo> tmp; - tmp.reserve(toSwitch.size()); - for (auto* portionInfo : toSwitch) { - tmp.emplace_back(std::move(*portionInfo)); - } - portions.swap(tmp); - } - - for (const auto& [mark, id] : marksGranules.GetOrderedMarks()) { - ui64 tmpGranule = changes->SetTmpGranule(pathId, mark); - for (const auto& batch : idBatches[id]) { - // Cannot set snapshot here. It would be set in committing transaction in ApplyChanges(). - auto newPortions = MakeAppendedPortions(pathId, batch, tmpGranule, maxSnapshot, blobs, changes->GetGranuleMeta()); - Y_VERIFY(newPortions.size() > 0); - for (auto& portion : newPortions) { - changes->AppendedPortions.emplace_back(std::move(portion)); - } - } - } - } else { - auto batches = SliceGranuleBatches(resultSchema->GetIndexInfo(), *changes, srcBatches, ts0); - - changes->SetTmpGranule(pathId, ts0); - for (auto& [ts, batch] : batches) { - // Tmp granule would be updated to correct value in ApplyChanges() - ui64 tmpGranule = changes->SetTmpGranule(pathId, ts); - - // Cannot set snapshot here. It would be set in committing transaction in ApplyChanges(). - auto portions = MakeAppendedPortions(pathId, batch, tmpGranule, maxSnapshot, blobs, changes->GetGranuleMeta()); - Y_VERIFY(portions.size() > 0); - for (auto& portion : portions) { - changes->AppendedPortions.emplace_back(std::move(portion)); - } - } - } - - return blobs; -} - -TConclusion<std::vector<TString>> TCompactionLogic::DoApply(std::shared_ptr<TColumnEngineChanges> changes) const noexcept { - auto castedChanges = std::dynamic_pointer_cast<TCompactColumnEngineChanges>(changes); - Y_VERIFY(castedChanges); - Y_VERIFY(!castedChanges->Blobs.empty()); // src data - Y_VERIFY(!castedChanges->SwitchedPortions.empty()); // src meta - Y_VERIFY(castedChanges->AppendedPortions.empty()); // dst meta - - if (castedChanges->CompactionInfo->InGranule()) { - return CompactInGranule(castedChanges); - } else { - return CompactSplitGranule(castedChanges); - } -} - -TConclusion<std::vector<TString>> TEvictionLogic::DoApply(std::shared_ptr<TColumnEngineChanges> changesExt) const noexcept { - auto changes = std::dynamic_pointer_cast<TTTLColumnEngineChanges>(changesExt); - Y_VERIFY(changes); - Y_VERIFY(!changes->Blobs.empty()); // src data - Y_VERIFY(!changes->PortionsToEvict.empty()); // src meta - Y_VERIFY(changes->EvictedRecords.empty()); // dst meta - - std::vector<TString> newBlobs; - std::vector<std::pair<TPortionInfo, TPortionEvictionFeatures>> evicted; - evicted.reserve(changes->PortionsToEvict.size()); - - for (auto& [portionInfo, evictFeatures] : changes->PortionsToEvict) { - Y_VERIFY(!portionInfo.Empty()); - Y_VERIFY(portionInfo.IsActive()); - - if (UpdateEvictedPortion(portionInfo, evictFeatures, changes->Blobs, - changes->EvictedRecords, newBlobs)) { - Y_VERIFY(portionInfo.TierName == evictFeatures.TargetTierName); - evicted.emplace_back(std::move(portionInfo), evictFeatures); - } - } - - changes->PortionsToEvict.swap(evicted); - return newBlobs; -} -} diff --git a/ydb/core/tx/columnshard/engines/index_logic_logs.h b/ydb/core/tx/columnshard/engines/index_logic_logs.h deleted file mode 100644 index 3b13506a5da..00000000000 --- a/ydb/core/tx/columnshard/engines/index_logic_logs.h +++ /dev/null @@ -1,123 +0,0 @@ -#pragma once - -#include "defs.h" -#include "portion_info.h" -#include "column_engine_logs.h" -#include <ydb/core/tx/columnshard/counters.h> - -namespace NKikimr::NOlap { - -class TIndexLogicBase { -protected: - const TVersionedIndex& SchemaVersions; - const NColumnShard::TIndexationCounters Counters; - virtual TConclusion<std::vector<TString>> DoApply(std::shared_ptr<TColumnEngineChanges> indexChanges) const noexcept = 0; -private: - const THashMap<ui64, NKikimr::NOlap::TTiering>* TieringMap = nullptr; -public: - TIndexLogicBase(const TVersionedIndex& indexInfo, const THashMap<ui64, NKikimr::NOlap::TTiering>& tieringMap, - const NColumnShard::TIndexationCounters& counters) - : SchemaVersions(indexInfo) - , Counters(counters) - , TieringMap(&tieringMap) - { - } - - TIndexLogicBase(const TVersionedIndex& indexInfo, const NColumnShard::TIndexationCounters& counters) - : SchemaVersions(indexInfo) - , Counters(counters) - { - } - - virtual ~TIndexLogicBase() { - } - TConclusion<std::vector<TString>> Apply(std::shared_ptr<TColumnEngineChanges> indexChanges) const noexcept { - { - ui64 readBytes = 0; - for (auto&& i : indexChanges->Blobs) { - readBytes += i.first.Size; - } - Counters.CompactionInputSize(readBytes); - } - const TMonotonic start = TMonotonic::Now(); - TConclusion<std::vector<TString>> result = DoApply(indexChanges); - if (result.IsSuccess()) { - Counters.CompactionDuration->Collect((TMonotonic::Now() - start).MilliSeconds()); - } else { - Counters.CompactionFails->Add(1); - } - return result; - } - -protected: - std::vector<TPortionInfo> MakeAppendedPortions(const ui64 pathId, - const std::shared_ptr<arrow::RecordBatch> batch, - const ui64 granule, - const TSnapshot& minSnapshot, - std::vector<TString>& blobs, const TGranuleMeta* granuleMeta) const; - - const THashMap<ui64, NKikimr::NOlap::TTiering>& GetTieringMap() const { - if (TieringMap) { - return *TieringMap; - } - return Default<THashMap<ui64, NKikimr::NOlap::TTiering>>(); - } -}; - -class TIndexationLogic: public TIndexLogicBase { -public: - using TIndexLogicBase::TIndexLogicBase; -protected: - virtual TConclusion<std::vector<TString>> DoApply(std::shared_ptr<TColumnEngineChanges> indexChanges) const noexcept override; -private: - // Although source batches are ordered only by PK (sorting key) resulting pathBatches are ordered by extended key. - // They have const snapshot columns that do not break sorting inside batch. - std::shared_ptr<arrow::RecordBatch> AddSpecials(const std::shared_ptr<arrow::RecordBatch>& srcBatch, - const TIndexInfo& indexInfo, const TInsertedData& inserted) const; -}; - -class TCompactionLogic: public TIndexLogicBase { -public: - using TIndexLogicBase::TIndexLogicBase; - -protected: - virtual TConclusion<std::vector<TString>> DoApply(std::shared_ptr<TColumnEngineChanges> indexChanges) const noexcept override; -private: - std::vector<TString> CompactSplitGranule(const std::shared_ptr<TCompactColumnEngineChanges>& changes) const; - std::vector<TString> CompactInGranule(std::shared_ptr<TCompactColumnEngineChanges> changes) const; - std::pair<std::shared_ptr<arrow::RecordBatch>, TSnapshot> CompactInOneGranule(ui64 granule, const std::vector<TPortionInfo>& portions, const THashMap<TBlobRange, TString>& blobs) const; - - /// @return vec({ts, batch}). ts0 <= ts1 <= ... <= tsN - /// @note We use ts from PK for split but there could be lots PK with the same ts. - std::vector<std::pair<TMark, std::shared_ptr<arrow::RecordBatch>>> - SliceGranuleBatches(const TIndexInfo& indexInfo, - const TCompactColumnEngineChanges& changes, - const std::vector<std::shared_ptr<arrow::RecordBatch>>& batches, - const TMark& ts0) const; - - /// @param[in,out] portions unchanged or only inserted portions in the same orders - /// @param[in,out] tsIds unchanged or marks from compacted portions ordered by mark - /// @param[in,out] toMove unchanged or compacted portions ordered by primary key - ui64 TryMovePortions(const TMark& ts0, - std::vector<TPortionInfo>& portions, - std::vector<std::pair<TMark, ui64>>& tsIds, - std::vector<std::pair<TPortionInfo, ui64>>& toMove) const; - - std::pair<std::vector<std::shared_ptr<arrow::RecordBatch>>, TSnapshot> PortionsToBatches(const std::vector<TPortionInfo>& portions, - const THashMap<TBlobRange, TString>& blobs, - bool insertedOnly = false) const; -}; - -class TEvictionLogic: public TIndexLogicBase { -public: - using TIndexLogicBase::TIndexLogicBase; - -protected: - virtual TConclusion<std::vector<TString>> DoApply(std::shared_ptr<TColumnEngineChanges> indexChanges) const noexcept override; -private: - bool UpdateEvictedPortion(TPortionInfo& portionInfo, - TPortionEvictionFeatures& evictFeatures, const THashMap<TBlobRange, TString>& srcBlobs, - std::vector<TColumnRecord>& evictedRecords, std::vector<TString>& newBlobs) const; -}; - -} diff --git a/ydb/core/tx/columnshard/engines/indexed_read_data.cpp b/ydb/core/tx/columnshard/engines/indexed_read_data.cpp index 57bec00c40c..ef49628706e 100644 --- a/ydb/core/tx/columnshard/engines/indexed_read_data.cpp +++ b/ydb/core/tx/columnshard/engines/indexed_read_data.cpp @@ -2,6 +2,7 @@ #include "indexed_read_data.h" #include "filter.h" #include "column_engine_logs.h" +#include "changes/mark_granules.h" #include <ydb/core/tx/columnshard/columnshard__index_scan.h> #include <ydb/core/tx/columnshard/columnshard__stats_scan.h> #include <ydb/core/formats/arrow/one_batch_input_stream.h> diff --git a/ydb/core/tx/columnshard/engines/portions/portion_info.cpp b/ydb/core/tx/columnshard/engines/portions/portion_info.cpp index b20a716f9e6..a7a36b0813a 100644 --- a/ydb/core/tx/columnshard/engines/portions/portion_info.cpp +++ b/ydb/core/tx/columnshard/engines/portions/portion_info.cpp @@ -221,10 +221,11 @@ std::shared_ptr<arrow::Scalar> TPortionInfo::MinValue(ui32 columnId) const { } std::shared_ptr<arrow::Scalar> TPortionInfo::MaxValue(ui32 columnId) const { - if (!Meta.ColumnMeta.contains(columnId)) { + auto it = Meta.ColumnMeta.find(columnId); + if (it == Meta.ColumnMeta.end()) { return {}; } - return Meta.ColumnMeta.find(columnId)->second.Max; + return it->second.Max; } std::shared_ptr<arrow::ChunkedArray> TPortionInfo::TPreparedColumn::Assemble() const { diff --git a/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp b/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp index 04617def4e5..914ef584cb0 100644 --- a/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp +++ b/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp @@ -1,7 +1,6 @@ #include <library/cpp/testing/unittest/registar.h> #include "column_engine_logs.h" #include "predicate/predicate.h" -#include "index_logic_logs.h" #include <ydb/core/tx/columnshard/columnshard_ut_common.h> @@ -267,15 +266,16 @@ bool Insert(TColumnEngineForLogs& engine, TTestDbWrapper& db, TSnapshot snap, } changes->Blobs.insert(blobs.begin(), blobs.end()); + changes->StartEmergency(); + + NOlap::TConstructionContext context(engine.GetVersionedIndex(), NColumnShard::TIndexationCounters("Indexation")); + std::vector<TString> newBlobs = std::move(changes->ConstructBlobs(context).DetachResult()); - TIndexationLogic logic(engine.GetVersionedIndex(), NColumnShard::TIndexationCounters("Indexation")); - std::vector<TString> newBlobs = std::move(logic.Apply(changes).DetachResult()); UNIT_ASSERT_VALUES_EQUAL(changes->AppendedPortions.size(), 1); UNIT_ASSERT_VALUES_EQUAL(newBlobs.size(), testColumns.size() + 2); // add 2 columns: planStep, txId AddIdsToBlobs(newBlobs, changes->AppendedPortions, blobs, step); - changes->StartEmergency(); const bool result = engine.ApplyChanges(db, changes, snap); changes->AbortEmergency(); return result; @@ -296,15 +296,15 @@ bool Compact(TColumnEngineForLogs& engine, TTestDbWrapper& db, TSnapshot snap, T std::shared_ptr<TCompactColumnEngineChanges> changes = engine.StartCompaction(std::move(compactionInfo), TestLimits()); UNIT_ASSERT_VALUES_EQUAL(changes->SwitchedPortions.size(), expected.SrcPortions); changes->SetBlobs(std::move(blobs)); + changes->StartEmergency(); + NOlap::TConstructionContext context(engine.GetVersionedIndex(), NColumnShard::TIndexationCounters("Compaction")); + std::vector<TString> newBlobs = std::move(changes->ConstructBlobs(context).DetachResult()); - TCompactionLogic logic(engine.GetVersionedIndex(), NColumnShard::TIndexationCounters("Compaction")); - std::vector<TString> newBlobs = std::move(logic.Apply(changes).DetachResult()); UNIT_ASSERT_VALUES_EQUAL(changes->AppendedPortions.size(), expected.NewPortions); AddIdsToBlobs(newBlobs, changes->AppendedPortions, changes->Blobs, step); UNIT_ASSERT_VALUES_EQUAL(changes->GetTmpGranuleIds().size(), expected.NewGranules); - changes->StartEmergency(); const bool result = engine.ApplyChanges(db, changes, snap); changes->AbortEmergency(); return result; diff --git a/ydb/core/tx/columnshard/engines/ut_program.cpp b/ydb/core/tx/columnshard/engines/ut_program.cpp index 6b112a6a110..fecb3e8080e 100644 --- a/ydb/core/tx/columnshard/engines/ut_program.cpp +++ b/ydb/core/tx/columnshard/engines/ut_program.cpp @@ -1,5 +1,4 @@ #include "index_info.h" -#include "index_logic_logs.h" #include <ydb/core/tx/columnshard/columnshard__index_scan.h> #include <ydb/core/tx/columnshard/columnshard_ut_common.h> diff --git a/ydb/core/tx/columnshard/engines/ya.make b/ydb/core/tx/columnshard/engines/ya.make index 6c1a4c78344..54de6c172cc 100644 --- a/ydb/core/tx/columnshard/engines/ya.make +++ b/ydb/core/tx/columnshard/engines/ya.make @@ -11,7 +11,6 @@ SRCS( db_wrapper.cpp index_info.cpp indexed_read_data.cpp - index_logic_logs.cpp filter.cpp portion_info.cpp tier_info.cpp diff --git a/ydb/core/tx/columnshard/eviction_actor.cpp b/ydb/core/tx/columnshard/eviction_actor.cpp index f6221302782..82621bf8615 100644 --- a/ydb/core/tx/columnshard/eviction_actor.cpp +++ b/ydb/core/tx/columnshard/eviction_actor.cpp @@ -1,6 +1,5 @@ #include "columnshard_impl.h" #include <ydb/core/tx/columnshard/engines/column_engine_logs.h> -#include <ydb/core/tx/columnshard/engines/index_logic_logs.h> #include "blob_cache.h" namespace NKikimr::NColumnShard { @@ -133,8 +132,8 @@ private: auto guard = TxEvent->PutResult->StartCpuGuard(); TxEvent->IndexChanges->SetBlobs(std::move(Blobs)); - NOlap::TEvictionLogic evictionLogic(TxEvent->IndexInfo, TxEvent->Tiering, Counters); - TxEvent->Blobs = std::move(evictionLogic.Apply(TxEvent->IndexChanges).DetachResult()); + NOlap::TConstructionContext context(TxEvent->IndexInfo, TxEvent->Tiering, Counters); + TxEvent->Blobs = std::move(TxEvent->IndexChanges->ConstructBlobs(context).DetachResult()); if (TxEvent->Blobs.empty()) { TxEvent->SetPutStatus(NKikimrProto::OK); } @@ -143,7 +142,6 @@ private: ctx.Send(Parent, TxEvent.release()); LOG_S_DEBUG("Portions eviction finished (" << blobsSize << " new blobs) at tablet " << TabletId); - //Die(ctx); // It's alive till tablet's death } }; diff --git a/ydb/core/tx/columnshard/indexing_actor.cpp b/ydb/core/tx/columnshard/indexing_actor.cpp index cafdd83dcf4..e320d935589 100644 --- a/ydb/core/tx/columnshard/indexing_actor.cpp +++ b/ydb/core/tx/columnshard/indexing_actor.cpp @@ -2,7 +2,6 @@ #include "columnshard_impl.h" #include "engines/changes/indexation.h" #include <ydb/core/tx/columnshard/engines/column_engine_logs.h> -#include <ydb/core/tx/columnshard/engines/index_logic_logs.h> #include <ydb/core/tx/conveyor/usage/events.h> #include <ydb/core/tx/conveyor/usage/service.h> @@ -128,8 +127,8 @@ private: virtual bool DoExecute() override { auto guard = TxEvent->PutResult->StartCpuGuard(); - NOlap::TIndexationLogic indexationLogic(TxEvent->IndexInfo, TxEvent->Tiering, Counters); - TxEvent->Blobs = std::move(indexationLogic.Apply(TxEvent->IndexChanges).DetachResult()); + NOlap::TConstructionContext context(TxEvent->IndexInfo, TxEvent->Tiering, Counters); + TxEvent->Blobs = std::move(TxEvent->IndexChanges->ConstructBlobs(context).DetachResult()); return true; } public: diff --git a/ydb/core/tx/columnshard/splitter/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/columnshard/splitter/CMakeLists.darwin-x86_64.txt new file mode 100644 index 00000000000..9fb62b49ac6 --- /dev/null +++ b/ydb/core/tx/columnshard/splitter/CMakeLists.darwin-x86_64.txt @@ -0,0 +1,20 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(tx-columnshard-splitter) +target_link_libraries(tx-columnshard-splitter PUBLIC + contrib-libs-cxxsupp + yutil + libs-apache-arrow + columnshard-engines-storage + columnshard-engines-scheme +) +target_sources(tx-columnshard-splitter PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/splitter/splitter.cpp +) diff --git a/ydb/core/tx/columnshard/splitter/CMakeLists.linux-aarch64.txt b/ydb/core/tx/columnshard/splitter/CMakeLists.linux-aarch64.txt new file mode 100644 index 00000000000..014ec6fd94d --- /dev/null +++ b/ydb/core/tx/columnshard/splitter/CMakeLists.linux-aarch64.txt @@ -0,0 +1,21 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(tx-columnshard-splitter) +target_link_libraries(tx-columnshard-splitter PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + libs-apache-arrow + columnshard-engines-storage + columnshard-engines-scheme +) +target_sources(tx-columnshard-splitter PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/splitter/splitter.cpp +) diff --git a/ydb/core/tx/columnshard/splitter/CMakeLists.linux-x86_64.txt b/ydb/core/tx/columnshard/splitter/CMakeLists.linux-x86_64.txt new file mode 100644 index 00000000000..014ec6fd94d --- /dev/null +++ b/ydb/core/tx/columnshard/splitter/CMakeLists.linux-x86_64.txt @@ -0,0 +1,21 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(tx-columnshard-splitter) +target_link_libraries(tx-columnshard-splitter PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + libs-apache-arrow + columnshard-engines-storage + columnshard-engines-scheme +) +target_sources(tx-columnshard-splitter PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/splitter/splitter.cpp +) diff --git a/ydb/core/tx/columnshard/splitter/CMakeLists.txt b/ydb/core/tx/columnshard/splitter/CMakeLists.txt new file mode 100644 index 00000000000..f8b31df0c11 --- /dev/null +++ b/ydb/core/tx/columnshard/splitter/CMakeLists.txt @@ -0,0 +1,17 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-aarch64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64") + include(CMakeLists.darwin-x86_64.txt) +elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA) + include(CMakeLists.windows-x86_64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-x86_64.txt) +endif() diff --git a/ydb/core/tx/columnshard/splitter/CMakeLists.windows-x86_64.txt b/ydb/core/tx/columnshard/splitter/CMakeLists.windows-x86_64.txt new file mode 100644 index 00000000000..9fb62b49ac6 --- /dev/null +++ b/ydb/core/tx/columnshard/splitter/CMakeLists.windows-x86_64.txt @@ -0,0 +1,20 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(tx-columnshard-splitter) +target_link_libraries(tx-columnshard-splitter PUBLIC + contrib-libs-cxxsupp + yutil + libs-apache-arrow + columnshard-engines-storage + columnshard-engines-scheme +) +target_sources(tx-columnshard-splitter PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/splitter/splitter.cpp +) diff --git a/ydb/core/tx/columnshard/splitter/splitter.cpp b/ydb/core/tx/columnshard/splitter/splitter.cpp new file mode 100644 index 00000000000..eb22d088b61 --- /dev/null +++ b/ydb/core/tx/columnshard/splitter/splitter.cpp @@ -0,0 +1,112 @@ +#include "splitter.h" + +namespace NKikimr::NOlap { + +TSplitLimiter::TSplitLimiter(const TGranuleMeta* granuleMeta, const NColumnShard::TIndexationCounters& counters, ISnapshotSchema::TPtr schema, const std::shared_ptr<arrow::RecordBatch> batch): Counters(counters) +, Batch(batch) +, Schema(schema) { + if (granuleMeta && granuleMeta->GetAdditiveSummary().GetOther().GetRecordsCount()) { + Y_VERIFY(granuleMeta->GetHardSummary().GetColumnIdsSortedBySizeDescending().size()); + SortedColumnIds = granuleMeta->GetHardSummary().GetColumnIdsSortedBySizeDescending(); + const auto biggestColumn = SortedColumnIds.front(); + Y_VERIFY(biggestColumn.GetPackedBlobsSize()); + const double expectedPackedRecordSize = 1.0 * biggestColumn.GetPackedBlobsSize() / granuleMeta->GetAdditiveSummary().GetOther().GetRecordsCount(); + BaseStepRecordsCount = ExpectedBlobSize / expectedPackedRecordSize; + for (ui32 i = 1; i < SortedColumnIds.size(); ++i) { + Y_VERIFY(SortedColumnIds[i - 1].GetPackedBlobsSize() >= SortedColumnIds[i].GetPackedBlobsSize()); + } + if (BaseStepRecordsCount > batch->num_rows()) { + BaseStepRecordsCount = batch->num_rows(); + } else { + BaseStepRecordsCount = batch->num_rows() / (ui32)(batch->num_rows() / BaseStepRecordsCount); + if (BaseStepRecordsCount * expectedPackedRecordSize > TCompactionLimits::MAX_BLOB_SIZE) { + BaseStepRecordsCount = ExpectedBlobSize / expectedPackedRecordSize; + } + } + } else { + for (auto&& i : Schema->GetIndexInfo().GetColumnIds()) { + SortedColumnIds.emplace_back(TColumnSummary(i)); + } + BaseStepRecordsCount = batch->num_rows(); + } + BaseStepRecordsCount = std::min<ui32>(BaseStepRecordsCount, Batch->num_rows()); + Y_VERIFY(BaseStepRecordsCount); + CurrentStepRecordsCount = BaseStepRecordsCount; +} + +bool TSplitLimiter::Next(std::vector<TString>& portionBlobs, std::shared_ptr<arrow::RecordBatch>& batch, const TSaverContext& saverContext) { + if (Position == Batch->num_rows()) { + return false; + } + + portionBlobs.resize(Schema->GetSchema()->num_fields()); + while (true) { + Y_VERIFY(Position < Batch->num_rows()); + std::shared_ptr<arrow::RecordBatch> currentBatch; + if (Batch->num_rows() - Position < CurrentStepRecordsCount * 1.1) { + currentBatch = Batch->Slice(Position, Batch->num_rows() - Position); + } else { + currentBatch = Batch->Slice(Position, CurrentStepRecordsCount); + } + + ui32 fillCounter = 0; + for (const auto& columnSummary : SortedColumnIds) { + const TString& columnName = Schema->GetIndexInfo().GetColumnName(columnSummary.GetColumnId()); + const int idx = Schema->GetFieldIndex(columnSummary.GetColumnId()); + Y_VERIFY(idx >= 0); + auto field = Schema->GetFieldByIndex(idx); + Y_VERIFY(field); + auto array = currentBatch->GetColumnByName(columnName); + Y_VERIFY(array); + auto columnSaver = Schema->GetColumnSaver(columnSummary.GetColumnId(), saverContext); + TString blob = TPortionInfo::SerializeColumn(array, field, columnSaver); + if (blob.size() >= TCompactionLimits::MAX_BLOB_SIZE) { + Counters.TrashDataSerializationBytes->Add(blob.size()); + Counters.TrashDataSerialization->Add(1); + Counters.TrashDataSerializationHistogramBytes->Collect(blob.size()); + const double kffNew = 1.0 * ExpectedBlobSize / blob.size() * ReduceCorrectionKff; + CurrentStepRecordsCount = currentBatch->num_rows() * kffNew; + Y_VERIFY(CurrentStepRecordsCount); + break; + } else { + Counters.CorrectDataSerializationBytes->Add(blob.size()); + Counters.CorrectDataSerialization->Add(1); + } + + portionBlobs[idx] = std::move(blob); + ++fillCounter; + } + + if (fillCounter == portionBlobs.size()) { + Y_VERIFY(fillCounter == portionBlobs.size()); + Position += currentBatch->num_rows(); + Y_VERIFY(Position <= Batch->num_rows()); + ui64 maxBlobSize = 0; + for (auto&& i : portionBlobs) { + Counters.SplittedPortionColumnSize->Collect(i.size()); + maxBlobSize = std::max<ui64>(maxBlobSize, i.size()); + } + batch = currentBatch; + if (maxBlobSize < MinBlobSize) { + if ((Position != currentBatch->num_rows() || Position != Batch->num_rows())) { + Counters.SplittedPortionLargestColumnSize->Collect(maxBlobSize); + Counters.TooSmallBlob->Add(1); + if (Position == Batch->num_rows()) { + Counters.TooSmallBlobFinish->Add(1); + } + if (Position == currentBatch->num_rows()) { + Counters.TooSmallBlobStart->Add(1); + } + } else { + Counters.SimpleSplitPortionLargestColumnSize->Collect(maxBlobSize); + } + CurrentStepRecordsCount = currentBatch->num_rows() * IncreaseCorrectionKff; + } else { + Counters.SplittedPortionLargestColumnSize->Collect(maxBlobSize); + } + return true; + } + } +} + +} diff --git a/ydb/core/tx/columnshard/splitter/splitter.h b/ydb/core/tx/columnshard/splitter/splitter.h new file mode 100644 index 00000000000..6528fd7ab6b --- /dev/null +++ b/ydb/core/tx/columnshard/splitter/splitter.h @@ -0,0 +1,32 @@ +#pragma once +#include <ydb/core/tx/columnshard/counters/indexation.h> +#include <ydb/core/tx/columnshard/engines/scheme/column_features.h> +#include <ydb/core/tx/columnshard/engines/scheme/abstract_scheme.h> +#include <ydb/core/tx/columnshard/engines/storage/granule.h> + +#include <contrib/libs/apache/arrow/cpp/src/arrow/record_batch.h> + +namespace NKikimr::NOlap { + +class TSplitLimiter { +private: + static const inline double ReduceCorrectionKff = 0.9; + static const inline double IncreaseCorrectionKff = 1.1; + static const inline ui64 ExpectedBlobSize = 6 * 1024 * 1024; + static const inline ui64 MinBlobSize = 1 * 1024 * 1024; + + const NColumnShard::TIndexationCounters Counters; + ui32 BaseStepRecordsCount = 0; + ui32 CurrentStepRecordsCount = 0; + std::shared_ptr<arrow::RecordBatch> Batch; + std::vector<TColumnSummary> SortedColumnIds; + ui32 Position = 0; + ISnapshotSchema::TPtr Schema; +public: + TSplitLimiter(const TGranuleMeta* granuleMeta, const NColumnShard::TIndexationCounters& counters, + ISnapshotSchema::TPtr schema, const std::shared_ptr<arrow::RecordBatch> batch); + + bool Next(std::vector<TString>& portionBlobs, std::shared_ptr<arrow::RecordBatch>& batch, const TSaverContext& saverContext); +}; + +} diff --git a/ydb/core/tx/columnshard/splitter/ya.make b/ydb/core/tx/columnshard/splitter/ya.make new file mode 100644 index 00000000000..88823997832 --- /dev/null +++ b/ydb/core/tx/columnshard/splitter/ya.make @@ -0,0 +1,13 @@ +LIBRARY() + +SRCS( + splitter.cpp +) + +PEERDIR( + contrib/libs/apache/arrow + ydb/core/tx/columnshard/engines/storage + ydb/core/tx/columnshard/engines/scheme +) + +END() diff --git a/ydb/core/tx/columnshard/tables_manager.cpp b/ydb/core/tx/columnshard/tables_manager.cpp index 24afb2c78e0..0633adac11d 100644 --- a/ydb/core/tx/columnshard/tables_manager.cpp +++ b/ydb/core/tx/columnshard/tables_manager.cpp @@ -283,11 +283,6 @@ void TTablesManager::IndexSchemaVersion(const TRowVersion& version, const NKikim } } -std::shared_ptr<NOlap::TCleanupColumnEngineChanges> TTablesManager::StartIndexCleanup(const NOlap::TSnapshot& snapshot, const NOlap::TCompactionLimits& limits, ui32 maxRecords) { - Y_VERIFY(PrimaryIndex); - return PrimaryIndex->StartCleanup(snapshot, limits, PathsToDrop, maxRecords); -} - NOlap::TIndexInfo TTablesManager::DeserializeIndexInfoFromProto(const NKikimrSchemeOp::TColumnTableSchema& schema) { std::optional<NOlap::TIndexInfo> indexInfo = NOlap::TIndexInfo::BuildFromProto(schema); Y_VERIFY(indexInfo); diff --git a/ydb/core/tx/columnshard/tables_manager.h b/ydb/core/tx/columnshard/tables_manager.h index 8fd3273c1fb..af2f9780704 100644 --- a/ydb/core/tx/columnshard/tables_manager.h +++ b/ydb/core/tx/columnshard/tables_manager.h @@ -150,6 +150,10 @@ public: return PathsToDrop; } + THashSet<ui64>& MutablePathsToDrop() { + return PathsToDrop; + } + const THashMap<ui64, TTableInfo>& GetTables() const { return Tables; } @@ -208,9 +212,6 @@ public: void AddTableVersion(const ui64 pathId, const TRowVersion& version, const TTableInfo::TTableVersionInfo& versionInfo, NIceDb::TNiceDb& db); void OnTtlUpdate(); - - std::shared_ptr<NOlap::TCleanupColumnEngineChanges> StartIndexCleanup(const NOlap::TSnapshot& snapshot, const NOlap::TCompactionLimits& limits, ui32 maxRecords); - private: void IndexSchemaVersion(const TRowVersion& version, const NKikimrSchemeOp::TColumnTableSchema& schema); static NOlap::TIndexInfo DeserializeIndexInfoFromProto(const NKikimrSchemeOp::TColumnTableSchema& schema); diff --git a/ydb/core/tx/columnshard/ya.make b/ydb/core/tx/columnshard/ya.make index 52adbd0fe65..14b1e978ca0 100644 --- a/ydb/core/tx/columnshard/ya.make +++ b/ydb/core/tx/columnshard/ya.make @@ -56,6 +56,7 @@ PEERDIR( ydb/core/tx/columnshard/engines/writer ydb/core/tx/columnshard/counters ydb/core/tx/columnshard/common + ydb/core/tx/columnshard/splitter ydb/core/tx/tiering ydb/core/tx/conveyor/usage ydb/core/tx/long_tx_service/public |