aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authornsofya <nsofya@yandex-team.com>2023-05-05 16:06:03 +0300
committernsofya <nsofya@yandex-team.com>2023-05-05 16:06:03 +0300
commitbf9252649a385716bb2c02c192302900f1a59d44 (patch)
tree52133fa1b328367e46c91348b36d68dee3680fbf
parentcf23a0405e5beab06a2ebd6b4b9905631e97ab72 (diff)
downloadydb-bf9252649a385716bb2c02c192302900f1a59d44.tar.gz
Organize code
Organize code
-rw-r--r--ydb/core/tx/columnshard/compaction_actor.cpp4
-rw-r--r--ydb/core/tx/columnshard/engines/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/core/tx/columnshard/engines/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/core/tx/columnshard/engines/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/core/tx/columnshard/engines/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/core/tx/columnshard/engines/column_engine_logs.cpp752
-rw-r--r--ydb/core/tx/columnshard/engines/column_engine_logs.h11
-rw-r--r--ydb/core/tx/columnshard/engines/index_logic.cpp733
-rw-r--r--ydb/core/tx/columnshard/engines/index_logic.h109
-rw-r--r--ydb/core/tx/columnshard/engines/ut_logs_engine.cpp9
-rw-r--r--ydb/core/tx/columnshard/eviction_actor.cpp6
-rw-r--r--ydb/core/tx/columnshard/indexing_actor.cpp5
12 files changed, 864 insertions, 769 deletions
diff --git a/ydb/core/tx/columnshard/compaction_actor.cpp b/ydb/core/tx/columnshard/compaction_actor.cpp
index e45dae396c7..29e8931d5b5 100644
--- a/ydb/core/tx/columnshard/compaction_actor.cpp
+++ b/ydb/core/tx/columnshard/compaction_actor.cpp
@@ -1,5 +1,6 @@
#include "columnshard_impl.h"
#include <ydb/core/tx/columnshard/engines/column_engine_logs.h>
+#include <ydb/core/tx/columnshard/engines/index_logic.h>
#include "blob_cache.h"
namespace NKikimr::NColumnShard {
@@ -130,7 +131,8 @@ private:
TxEvent->IndexChanges->SetBlobs(std::move(Blobs));
- TxEvent->Blobs = NOlap::TColumnEngineForLogs::CompactBlobs(TxEvent->IndexInfo, TxEvent->Tiering, TxEvent->IndexChanges);
+ NOlap::TCompactionLogic compactionLogic(TxEvent->IndexInfo, TxEvent->Tiering);
+ TxEvent->Blobs = compactionLogic.Apply(TxEvent->IndexChanges);
if (TxEvent->Blobs.empty()) {
TxEvent->PutStatus = NKikimrProto::OK; // nothing to write, commit
}
diff --git a/ydb/core/tx/columnshard/engines/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/columnshard/engines/CMakeLists.darwin-x86_64.txt
index a7adbff89e2..657bcf12593 100644
--- a/ydb/core/tx/columnshard/engines/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/tx/columnshard/engines/CMakeLists.darwin-x86_64.txt
@@ -33,6 +33,7 @@ target_sources(tx-columnshard-engines PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/insert_table.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/index_info.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/indexed_read_data.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/index_logic.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/filter.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/portion_info.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/predicate.cpp
diff --git a/ydb/core/tx/columnshard/engines/CMakeLists.linux-aarch64.txt b/ydb/core/tx/columnshard/engines/CMakeLists.linux-aarch64.txt
index 39ca9cc7e6c..2e34799eb86 100644
--- a/ydb/core/tx/columnshard/engines/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/tx/columnshard/engines/CMakeLists.linux-aarch64.txt
@@ -34,6 +34,7 @@ target_sources(tx-columnshard-engines PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/insert_table.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/index_info.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/indexed_read_data.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/index_logic.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/filter.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/portion_info.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/predicate.cpp
diff --git a/ydb/core/tx/columnshard/engines/CMakeLists.linux-x86_64.txt b/ydb/core/tx/columnshard/engines/CMakeLists.linux-x86_64.txt
index 39ca9cc7e6c..2e34799eb86 100644
--- a/ydb/core/tx/columnshard/engines/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/tx/columnshard/engines/CMakeLists.linux-x86_64.txt
@@ -34,6 +34,7 @@ target_sources(tx-columnshard-engines PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/insert_table.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/index_info.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/indexed_read_data.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/index_logic.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/filter.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/portion_info.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/predicate.cpp
diff --git a/ydb/core/tx/columnshard/engines/CMakeLists.windows-x86_64.txt b/ydb/core/tx/columnshard/engines/CMakeLists.windows-x86_64.txt
index a7adbff89e2..657bcf12593 100644
--- a/ydb/core/tx/columnshard/engines/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/tx/columnshard/engines/CMakeLists.windows-x86_64.txt
@@ -33,6 +33,7 @@ target_sources(tx-columnshard-engines PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/insert_table.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/index_info.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/indexed_read_data.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/index_logic.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/filter.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/portion_info.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/predicate.cpp
diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
index 348e9557950..37029a8f3c3 100644
--- a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
+++ b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
@@ -1,47 +1,18 @@
#include "column_engine_logs.h"
#include "indexed_read_data.h"
+#include "index_logic.h"
+
#include "filter.h"
#include <ydb/core/formats/arrow/one_batch_input_stream.h>
#include <ydb/core/formats/arrow/merging_sorted_input_stream.h>
#include <concepts>
-#include <span>
namespace NKikimr::NOlap {
namespace {
-std::shared_ptr<arrow::RecordBatch> GetEffectiveKey(const std::shared_ptr<arrow::RecordBatch>& batch,
- const TIndexInfo& indexInfo) {
- // TODO: composite effective key
- auto columnName = indexInfo.GetPrimaryKey()[0].first;
- auto resBatch = NArrow::ExtractColumns(batch, {std::string(columnName.data(), columnName.size())});
- Y_VERIFY_S(resBatch, "No column '" << columnName << "' in batch " << batch->schema()->ToString());
- return resBatch;
-}
-
-arrow::ipc::IpcWriteOptions WriteOptions(const TCompression& compression) {
- auto& codec = compression.Codec;
-
- arrow::ipc::IpcWriteOptions options(arrow::ipc::IpcWriteOptions::Defaults());
- Y_VERIFY(arrow::util::Codec::IsAvailable(codec));
- arrow::Result<std::unique_ptr<arrow::util::Codec>> resCodec;
- if (compression.Level) {
- resCodec = arrow::util::Codec::Create(codec, *compression.Level);
- if (!resCodec.ok()) {
- resCodec = arrow::util::Codec::Create(codec);
- }
- } else {
- resCodec = arrow::util::Codec::Create(codec);
- }
- Y_VERIFY(resCodec.ok());
-
- options.codec.reset((*resCodec).release());
- options.use_threads = false;
- return options;
-}
-
std::optional<NArrow::TReplaceKey> ExtractKey(const std::shared_ptr<TPredicate>& pkPredicate,
const std::shared_ptr<arrow::Schema>& key) {
if (pkPredicate) {
@@ -51,155 +22,6 @@ std::optional<NArrow::TReplaceKey> ExtractKey(const std::shared_ptr<TPredicate>&
return {};
}
-// Although source batches are ordered only by PK (sorting key) resulting pathBatches are ordered by extended key.
-// They have const snapshot columns that do not break sorting inside batch.
-std::shared_ptr<arrow::RecordBatch> AddSpecials(const std::shared_ptr<arrow::RecordBatch>& srcBatch,
- const TIndexInfo& indexInfo, const TInsertedData& inserted)
-{
- auto batch = TIndexInfo::AddSpecialColumns(srcBatch, inserted.PlanStep(), inserted.TxId());
- Y_VERIFY(batch);
-
- return NArrow::ExtractColumns(batch, indexInfo.ArrowSchemaWithSpecials());
-}
-
-bool UpdateEvictedPortion(TPortionInfo& portionInfo, const TIndexInfo& indexInfo, const THashMap<ui64, NKikimr::NOlap::TTiering>& tieringMap,
- TPortionEvictionFeatures& evictFeatures, const THashMap<TBlobRange, TString>& srcBlobs,
- TVector<TColumnRecord>& evictedRecords, TVector<TString>& newBlobs)
-{
- Y_VERIFY(portionInfo.TierName != evictFeatures.TargetTierName);
-
- auto* tiering = tieringMap.FindPtr(evictFeatures.PathId);
- Y_VERIFY(tiering);
- auto compression = tiering->GetCompression(evictFeatures.TargetTierName);
- if (!compression) {
- // Noting to recompress. We have no other kinds of evictions yet.
- portionInfo.TierName = evictFeatures.TargetTierName;
- evictFeatures.DataChanges = false;
- return true;
- }
-
- Y_VERIFY(!evictFeatures.NeedExport);
-
- auto schema = indexInfo.ArrowSchemaWithSpecials();
- auto batch = portionInfo.AssembleInBatch(indexInfo, schema, srcBlobs);
- auto writeOptions = WriteOptions(*compression);
-
- TPortionInfo undo = portionInfo;
- size_t undoSize = newBlobs.size();
-
- for (auto& rec : portionInfo.Records) {
- auto colName = indexInfo.GetColumnName(rec.ColumnId);
- std::string name(colName.data(), colName.size());
- auto field = schema->GetFieldByName(name);
-
- auto blob = TPortionInfo::SerializeColumn(batch->GetColumnByName(name), field, writeOptions);
- if (blob.size() >= TPortionInfo::BLOB_BYTES_LIMIT) {
- portionInfo = undo;
- newBlobs.resize(undoSize);
- return false;
- }
- newBlobs.emplace_back(std::move(blob));
- rec.BlobRange = TBlobRange{};
- }
-
- for (auto& rec : undo.Records) {
- evictedRecords.emplace_back(std::move(rec));
- }
-
- portionInfo.AddMetadata(indexInfo, batch, evictFeatures.TargetTierName);
- return true;
-}
-
-TVector<TPortionInfo> MakeAppendedPortions(const ui64 pathId, const TIndexInfo& indexInfo,
- const THashMap<ui64, NKikimr::NOlap::TTiering>& tieringMap,
- const std::shared_ptr<arrow::RecordBatch> batch,
- const ui64 granule,
- const TSnapshot& minSnapshot,
- TVector<TString>& blobs) {
- Y_VERIFY(batch->num_rows());
- const auto schema = indexInfo.ArrowSchemaWithSpecials();
- TVector<TPortionInfo> out;
-
- TString tierName;
- TCompression compression = indexInfo.GetDefaultCompression();
- if (pathId) {
- if (auto* tiering = tieringMap.FindPtr(pathId)) {
- tierName = tiering->GetHottestTierName();
- if (const auto& tierCompression = tiering->GetCompression(tierName)) {
- compression = *tierCompression;
- }
- }
- }
- const auto writeOptions = WriteOptions(compression);
-
- std::shared_ptr<arrow::RecordBatch> portionBatch = batch;
- for (i32 pos = 0; pos < batch->num_rows();) {
- Y_VERIFY(portionBatch->num_rows());
-
- TPortionInfo portionInfo;
- portionInfo.Records.reserve(schema->num_fields());
- TVector<TString> portionBlobs;
- portionBlobs.reserve(schema->num_fields());
-
- // Serialize portion's columns into blobs
-
- bool ok = true;
- for (const auto& field : schema->fields()) {
- const auto& name = field->name();
- ui32 columnId = indexInfo.GetColumnId(TString(name.data(), name.size()));
-
- /// @warnign records are not valid cause of empty BlobId and zero Portion
- TColumnRecord record = TColumnRecord::Make(granule, columnId, minSnapshot, 0);
- auto blob = portionInfo.AddOneChunkColumn(portionBatch->GetColumnByName(name), field, std::move(record),
- writeOptions);
- if (!blob.size()) {
- ok = false;
- break;
- }
-
- // TODO: combine small columns in one blob
- portionBlobs.emplace_back(std::move(blob));
- }
-
- if (ok) {
- portionInfo.AddMetadata(indexInfo, portionBatch, tierName);
- out.emplace_back(std::move(portionInfo));
- for (auto& blob : portionBlobs) {
- blobs.push_back(blob);
- }
- pos += portionBatch->num_rows();
- if (pos < batch->num_rows()) {
- portionBatch = batch->Slice(pos);
- }
- } else {
- const i64 halfLen = portionBatch->num_rows() / 2;
- Y_VERIFY(halfLen);
- portionBatch = batch->Slice(pos, halfLen);
- }
- }
-
- return out;
-}
-
-std::vector<std::shared_ptr<arrow::RecordBatch>> PortionsToBatches(const TIndexInfo& indexInfo,
- const TVector<TPortionInfo>& portions,
- const THashMap<TBlobRange, TString>& blobs,
- bool insertedOnly = false) {
- // TODO: schema changes
- const std::shared_ptr<arrow::Schema> schema = indexInfo.ArrowSchemaWithSpecials();
-
- std::vector<std::shared_ptr<arrow::RecordBatch>> batches;
- batches.reserve(portions.size());
-
- for (auto& portionInfo : portions) {
- auto batch = portionInfo.AssembleInBatch(indexInfo, schema, blobs);
- if (!insertedOnly || portionInfo.IsInserted()) {
- batches.push_back(batch);
- }
- }
- return batches;
-}
-
bool InitInGranuleMerge(const TMark& granuleMark, TVector<TPortionInfo>& portions, const TCompactionLimits& limits,
const TSnapshot& snap, TColumnEngineForLogs::TMarksGranules& marksGranules) {
ui64 oldTimePlanStep = snap.PlanStep - TDuration::Seconds(limits.InGranuleCompactSeconds).MilliSeconds();
@@ -326,51 +148,6 @@ TVector<const TPortionInfo*> GetActualPortions(const THashMap<ui64, TPortionInfo
return out;
}
-THashMap<ui64, std::shared_ptr<arrow::RecordBatch>>
-SliceIntoGranules(const std::shared_ptr<arrow::RecordBatch>& batch,
- const std::vector<std::pair<TMark, ui64>>& granules,
- const TIndexInfo& indexInfo)
-{
- Y_VERIFY(batch);
- if (batch->num_rows() == 0) {
- return {};
- }
-
- THashMap<ui64, std::shared_ptr<arrow::RecordBatch>> out;
-
- if (granules.size() == 1) {
- out.emplace(granules[0].second, batch);
- } else {
- const auto effKey = GetEffectiveKey(batch, indexInfo);
- Y_VERIFY(effKey->num_columns() && effKey->num_rows());
-
- std::vector<NArrow::TRawReplaceKey> keys;
- {
- const auto& columns = effKey->columns();
- keys.reserve(effKey->num_rows());
- for (i64 i = 0; i < effKey->num_rows(); ++i) {
- keys.emplace_back(NArrow::TRawReplaceKey(&columns, i));
- }
- }
-
- i64 offset = 0;
- for (size_t i = 0; i < granules.size() && offset < effKey->num_rows(); ++i) {
- const i64 end = (i + 1 == granules.size())
- // Just take the number of elements in the key column for the last granule.
- ? effKey->num_rows()
- // Locate position of the next granule in the key.
- : NArrow::LowerBound(keys, granules[i + 1].first.Border, offset);
-
- if (const i64 size = end - offset) {
- Y_VERIFY(out.emplace(granules[i].second, batch->Slice(offset, size)).second);
- }
-
- offset = end;
- }
- }
- return out;
-}
-
} // namespace
@@ -427,7 +204,7 @@ THashMap<ui64, std::shared_ptr<arrow::RecordBatch>>
TColumnEngineForLogs::TMarksGranules::SliceIntoGranules(const std::shared_ptr<arrow::RecordBatch>& batch,
const TIndexInfo& indexInfo)
{
- return NOlap::SliceIntoGranules(batch, Marks, indexInfo);
+ return NOlap::TIndexLogicBase::SliceIntoGranules(batch, Marks, indexInfo);
}
@@ -1614,527 +1391,4 @@ std::unique_ptr<TCompactionInfo> TColumnEngineForLogs::Compact(ui64& lastCompact
return {};
}
-TVector<TString> TColumnEngineForLogs::IndexBlobs(const TIndexInfo& indexInfo, const THashMap<ui64, NKikimr::NOlap::TTiering>& tieringMap,
- std::shared_ptr<TColumnEngineChanges> indexChanges) {
- auto changes = std::static_pointer_cast<TChanges>(indexChanges);
- Y_VERIFY(!changes->DataToIndex.empty());
- Y_VERIFY(changes->AppendedPortions.empty());
- Y_VERIFY(indexInfo.IsSorted());
-
- TSnapshot& minSnapshot = changes->ApplySnapshot;
- THashMap<ui64, std::vector<std::shared_ptr<arrow::RecordBatch>>> pathBatches;
- for (auto& inserted : changes->DataToIndex) {
- TSnapshot insertSnap{inserted.PlanStep(), inserted.TxId()};
- Y_VERIFY(insertSnap.Valid());
- if (minSnapshot.IsZero() || insertSnap <= minSnapshot) {
- minSnapshot = insertSnap;
- }
-
- TBlobRange blobRange(inserted.BlobId, 0, inserted.BlobId.BlobSize());
-
- std::shared_ptr<arrow::RecordBatch> batch;
- if (auto it = changes->CachedBlobs.find(inserted.BlobId); it != changes->CachedBlobs.end()) {
- batch = it->second;
- } else if (auto* blobData = changes->Blobs.FindPtr(blobRange)) {
- Y_VERIFY(!blobData->empty(), "Blob data not present");
- batch = NArrow::DeserializeBatch(*blobData, indexInfo.ArrowSchema());
- } else {
- Y_VERIFY(blobData, "Data for range %s has not been read", blobRange.ToString().c_str());
- }
- Y_VERIFY(batch);
-
- batch = AddSpecials(batch, indexInfo, inserted);
- pathBatches[inserted.PathId].push_back(batch);
- Y_VERIFY_DEBUG(NArrow::IsSorted(pathBatches[inserted.PathId].back(), indexInfo.GetReplaceKey()));
- }
- Y_VERIFY(minSnapshot.Valid());
-
- TVector<TString> blobs;
-
- for (auto& [pathId, batches] : pathBatches) {
- changes->AddPathIfNotExists(pathId);
-
- // We could merge data here cause tablet limits indexing data portions
-#if 0
- auto merged = NArrow::CombineSortedBatches(batches, indexInfo.SortDescription()); // insert: no replace
- Y_VERIFY(merged);
- Y_VERIFY_DEBUG(NArrow::IsSorted(merged, indexInfo.GetReplaceKey()));
-#else
- auto merged = NArrow::CombineSortedBatches(batches, indexInfo.SortReplaceDescription());
- Y_VERIFY(merged);
- Y_VERIFY_DEBUG(NArrow::IsSortedAndUnique(merged, indexInfo.GetReplaceKey()));
-
-#endif
-
- auto granuleBatches = SliceIntoGranules(merged, changes->PathToGranule[pathId], indexInfo);
- for (auto& [granule, batch] : granuleBatches) {
- auto portions = MakeAppendedPortions(pathId, indexInfo, tieringMap, batch, granule, minSnapshot, blobs);
- Y_VERIFY(portions.size() > 0);
- for (auto& portion : portions) {
- changes->AppendedPortions.emplace_back(std::move(portion));
- }
- }
- }
-
- Y_VERIFY(changes->PathToGranule.size() == pathBatches.size());
- return blobs;
-}
-
-static std::shared_ptr<arrow::RecordBatch> CompactInOneGranule(const TIndexInfo& indexInfo, ui64 granule,
- const TVector<TPortionInfo>& portions,
- const THashMap<TBlobRange, TString>& blobs) {
- std::vector<std::shared_ptr<arrow::RecordBatch>> batches;
- batches.reserve(portions.size());
-
- auto schema = indexInfo.ArrowSchemaWithSpecials();
- for (auto& portionInfo : portions) {
- Y_VERIFY(!portionInfo.Empty());
- Y_VERIFY(portionInfo.Granule() == granule);
-
- auto batch = portionInfo.AssembleInBatch(indexInfo, schema, blobs);
- batches.push_back(batch);
- }
-
- auto sortedBatch = NArrow::CombineSortedBatches(batches, indexInfo.SortReplaceDescription());
- Y_VERIFY_DEBUG(NArrow::IsSortedAndUnique(sortedBatch, indexInfo.GetReplaceKey()));
-
- return sortedBatch;
-}
-
-static TVector<TString> CompactInGranule(const TIndexInfo& indexInfo, const THashMap<ui64, NKikimr::NOlap::TTiering>& tieringMap,
- std::shared_ptr<TColumnEngineForLogs::TChanges> changes) {
- const ui64 pathId = changes->SrcGranule->PathId;
- TVector<TString> blobs;
- auto& switchedProtions = changes->SwitchedPortions;
- Y_VERIFY(switchedProtions.size());
-
- ui64 granule = switchedProtions[0].Granule();
- auto batch = CompactInOneGranule(indexInfo, granule, switchedProtions, changes->Blobs);
-
- TVector<TPortionInfo> portions;
- if (!changes->MergeBorders.Empty()) {
- Y_VERIFY(changes->MergeBorders.GetOrderedMarks().size() > 1);
- auto slices = changes->MergeBorders.SliceIntoGranules(batch, indexInfo);
- portions.reserve(slices.size());
-
- for (auto& [_, slice] : slices) {
- if (!slice || slice->num_rows() == 0) {
- continue;
- }
- auto tmp = MakeAppendedPortions(pathId, indexInfo, tieringMap, slice, granule, TSnapshot{}, blobs);
- for (auto&& portionInfo : tmp) {
- portions.emplace_back(std::move(portionInfo));
- }
- }
- } else {
- portions = MakeAppendedPortions(pathId, indexInfo, tieringMap, batch, granule, TSnapshot{}, blobs);
- }
-
- Y_VERIFY(portions.size() > 0);
- for (auto& portion : portions) {
- changes->AppendedPortions.emplace_back(std::move(portion));
- }
-
- return blobs;
-}
-
-/// @return vec({ts, batch}). ts0 <= ts1 <= ... <= tsN
-/// @note We use ts from PK for split but there could be lots PK with the same ts.
-static TVector<std::pair<TMark, std::shared_ptr<arrow::RecordBatch>>>
-SliceGranuleBatches(const TIndexInfo& indexInfo,
- const TColumnEngineForLogs::TChanges& changes,
- const std::vector<std::shared_ptr<arrow::RecordBatch>>& batches,
- const TMark& ts0) {
- TVector<std::pair<TMark, std::shared_ptr<arrow::RecordBatch>>> out;
-
- // Extract unique effective keys and their counts
- i64 numRows = 0;
- TMap<NArrow::TReplaceKey, ui32> uniqKeyCount;
- for (const auto& batch : batches) {
- Y_VERIFY(batch);
- if (batch->num_rows() == 0) {
- continue;
- }
-
- numRows += batch->num_rows();
-
- const auto effKey = GetEffectiveKey(batch, indexInfo);
- Y_VERIFY(effKey->num_columns() && effKey->num_rows());
-
- auto effColumns = std::make_shared<NArrow::TArrayVec>(effKey->columns());
- for (int row = 0; row < effKey->num_rows(); ++row) {
- ++uniqKeyCount[NArrow::TReplaceKey(effColumns, row)];
- }
- }
-
- Y_VERIFY(uniqKeyCount.size());
- auto minTs = uniqKeyCount.begin()->first;
- auto maxTs = uniqKeyCount.rbegin()->first;
- Y_VERIFY(minTs >= ts0.Border);
-
- // It's an estimation of needed count cause numRows calculated before key replaces
- ui32 numSplitInto = changes.NumSplitInto(numRows);
- ui32 rowsInGranule = numRows / numSplitInto;
- Y_VERIFY(rowsInGranule);
-
- // Cannot split in case of one unique key
- if (uniqKeyCount.size() == 1) {
- // We have to split big batch of same key in several portions
- auto merged = NArrow::MergeSortedBatches(batches, indexInfo.SortReplaceDescription(), rowsInGranule);
- for (auto& batch : merged) {
- Y_VERIFY_DEBUG(NArrow::IsSortedAndUnique(batch, indexInfo.GetReplaceKey()));
- out.emplace_back(ts0, batch);
- }
- return out;
- }
-
- // Make split borders from uniq keys
- TVector<NArrow::TReplaceKey> borders;
- borders.reserve(numRows / rowsInGranule);
- {
- ui32 sumRows = 0;
- for (auto& [ts, num] : uniqKeyCount) {
- if (sumRows >= rowsInGranule) {
- borders.emplace_back(ts);
- sumRows = 0;
- }
- sumRows += num;
- }
- if (borders.empty()) {
- borders.emplace_back(maxTs); // huge trailing key
- }
- Y_VERIFY(borders.size());
- }
-
- // Find offsets in source batches
- TVector<TVector<int>> offsets(batches.size()); // vec[batch][border] = offset
- for (size_t i = 0; i < batches.size(); ++i) {
- const auto& batch = batches[i];
- auto& batchOffsets = offsets[i];
- batchOffsets.reserve(borders.size() + 1);
-
- const auto effKey = GetEffectiveKey(batch, indexInfo);
- Y_VERIFY(effKey->num_columns() && effKey->num_rows());
-
- std::vector<NArrow::TRawReplaceKey> keys;
- {
- const auto& columns = effKey->columns();
- keys.reserve(effKey->num_rows());
- for (i64 i = 0; i < effKey->num_rows(); ++i) {
- keys.emplace_back(NArrow::TRawReplaceKey(&columns, i));
- }
- }
-
- batchOffsets.push_back(0);
- for (const auto& border : borders) {
- int offset = NArrow::LowerBound(keys, border, batchOffsets.back());
- Y_VERIFY(offset >= batchOffsets.back());
- batchOffsets.push_back(offset);
- }
-
- Y_VERIFY(batchOffsets.size() == borders.size() + 1);
- }
-
- // Make merge-sorted granule batch for each splitted granule
- for (ui32 granuleNo = 0; granuleNo < borders.size() + 1; ++granuleNo) {
- std::vector<std::shared_ptr<arrow::RecordBatch>> granuleBatches;
- granuleBatches.reserve(batches.size());
-
- // Extract granule: slice source batches with offsets
- i64 granuleNumRows = 0;
- for (size_t i = 0; i < batches.size(); ++i) {
- const auto& batch = batches[i];
- auto& batchOffsets = offsets[i];
-
- int offset = batchOffsets[granuleNo];
- int end = batch->num_rows();
- if (granuleNo < borders.size()) {
- end = batchOffsets[granuleNo + 1];
- }
- int size = end - offset;
- Y_VERIFY(size >= 0);
-
- if (size) {
- auto slice = batch->Slice(offset, size);
- Y_VERIFY(slice->num_rows());
- granuleNumRows += slice->num_rows();
-#if 1 // Check correctness
- const auto effKey = GetEffectiveKey(slice, indexInfo);
- Y_VERIFY(effKey->num_columns() && effKey->num_rows());
-
- auto startKey = granuleNo ? borders[granuleNo - 1] : minTs;
- Y_VERIFY(NArrow::TReplaceKey::FromBatch(effKey, 0) >= startKey);
-
- NArrow::TReplaceKey lastSliceKey = NArrow::TReplaceKey::FromBatch(effKey, effKey->num_rows() - 1);
- if (granuleNo < borders.size() - 1) {
- const auto& endKey = borders[granuleNo];
- Y_VERIFY(lastSliceKey < endKey);
- } else {
- Y_VERIFY(lastSliceKey <= maxTs);
- }
-#endif
- Y_VERIFY_DEBUG(NArrow::IsSorted(slice, indexInfo.GetReplaceKey()));
- granuleBatches.emplace_back(slice);
- }
- }
-
- // Merge slices. We have to split a big key batches in several ones here.
- if (granuleNumRows > 4 * rowsInGranule) {
- granuleNumRows = rowsInGranule;
- }
- auto merged = NArrow::MergeSortedBatches(granuleBatches, indexInfo.SortReplaceDescription(), granuleNumRows);
- for (auto& batch : merged) {
- Y_VERIFY_DEBUG(NArrow::IsSortedAndUnique(batch, indexInfo.GetReplaceKey()));
-
- auto startKey = ts0.Border;
- if (granuleNo) {
- startKey = borders[granuleNo - 1];
- }
-#if 1 // Check correctness
- const auto effKey = GetEffectiveKey(batch, indexInfo);
- Y_VERIFY(effKey->num_columns() && effKey->num_rows());
-
- Y_VERIFY(NArrow::TReplaceKey::FromBatch(effKey, 0) >= startKey);
-#endif
- out.emplace_back(TMark(startKey), batch);
- }
- }
-
- return out;
-}
-
-/// @param[in,out] portions unchanged or only inserted portions in the same orders
-/// @param[in,out] tsIds unchanged or marks from compacted portions ordered by mark
-/// @param[in,out] toMove unchanged or compacted portions ordered by primary key
-static ui64 TryMovePortions(const TMark& ts0,
- TVector<TPortionInfo>& portions,
- std::vector<std::pair<TMark, ui64>>& tsIds,
- TVector<std::pair<TPortionInfo, ui64>>& toMove)
-{
- std::vector<TPortionInfo*> partitioned(portions.size());
- // Split portions by putting the inserted portions in the original order
- // at the beginning of the buffer and the compacted portions at the end.
- // The compacted portions will be put in the reversed order, but it will be sorted later.
- const auto [inserted, compacted] = [&]() {
- size_t l = 0;
- size_t r = portions.size();
-
- for (auto& portionInfo : portions) {
- partitioned[(portionInfo.IsInserted() ? l++ : --r)] = &portionInfo;
- }
-
- return std::make_tuple(std::span(partitioned.begin(), l), std::span(partitioned.begin() + l, partitioned.end()));
- }();
-
- // Do nothing if there are less than two compacted protions.
- if (compacted.size() < 2) {
- return 0;
- }
- // Order compacted portions by primary key.
- std::sort(compacted.begin(), compacted.end(), [](const TPortionInfo* a, const TPortionInfo* b) {
- return a->EffKeyStart() < b->EffKeyStart();
- });
- // Check that there are no gaps between two adjacent portions in term of primary key range.
- for (size_t i = 0; i < compacted.size() - 1; ++i) {
- if (compacted[i]->EffKeyEnd() >= compacted[i + 1]->EffKeyStart()) {
- return 0;
- }
- }
-
- toMove.reserve(compacted.size());
- ui64 numRows = 0;
- ui32 counter = 0;
- for (auto* portionInfo : compacted) {
- ui32 rows = portionInfo->NumRows();
- Y_VERIFY(rows);
- numRows += rows;
- tsIds.emplace_back((counter ? TMark(portionInfo->EffKeyStart()) : ts0), counter + 1);
- toMove.emplace_back(std::move(*portionInfo), counter);
- ++counter;
- // Ensure that std::move will take an effect.
- static_assert(std::swappable<decltype(*portionInfo)>);
- }
-
- std::vector<TPortionInfo> out;
- out.reserve(inserted.size());
- for (auto* portionInfo : inserted) {
- out.emplace_back(std::move(*portionInfo));
- // Ensure that std::move will take an effect.
- static_assert(std::swappable<decltype(*portionInfo)>);
- }
- portions.swap(out);
-
- return numRows;
-}
-
-static TVector<TString> CompactSplitGranule(const TIndexInfo& indexInfo, const THashMap<ui64, NKikimr::NOlap::TTiering>& tieringMap,
- const std::shared_ptr<TColumnEngineForLogs::TChanges>& changes)
-{
- const ui64 pathId = changes->SrcGranule->PathId;
- const TMark ts0 = changes->SrcGranule->Mark;
- TVector<TPortionInfo>& portions = changes->SwitchedPortions;
-
- std::vector<std::pair<TMark, ui64>> tsIds;
- ui64 movedRows = TryMovePortions(ts0, portions, tsIds, changes->PortionsToMove);
- const auto& srcBatches = PortionsToBatches(indexInfo, portions, changes->Blobs, movedRows != 0);
- Y_VERIFY(srcBatches.size() == portions.size());
-
- TVector<TString> blobs;
-
- if (movedRows) {
- Y_VERIFY(changes->PortionsToMove.size() >= 2);
- Y_VERIFY(changes->PortionsToMove.size() == tsIds.size());
- Y_VERIFY(tsIds.begin()->first == ts0);
-
- // Calculate total number of rows.
- ui64 numRows = movedRows;
- for (const auto& batch : srcBatches) {
- numRows += batch->num_rows();
- }
-
- // Recalculate new granules borders (if they are larger then portions)
- ui32 numSplitInto = changes->NumSplitInto(numRows);
- if (numSplitInto < tsIds.size()) {
- const ui32 rowsInGranule = numRows / numSplitInto;
- Y_VERIFY(rowsInGranule);
-
- TVector<std::pair<TMark, ui64>> newTsIds;
- ui32 tmpGranule = 0;
- ui32 sumRows = 0;
- // Always insert mark of the source granule at the beginning.
- newTsIds.emplace_back(ts0, 1);
-
- for (size_t i = 0, end = tsIds.size(); i != end; ++i) {
- const TMark& ts = tsIds[i].first;
- // Make new granule if the current number of rows is exceeded the allowed number of rows in the granule
- // or there is the end of the ids and nothing was inserted so far.
- if (sumRows >= rowsInGranule || (i + 1 == end && newTsIds.size() == 1)) {
- ++tmpGranule;
- newTsIds.emplace_back(ts, tmpGranule + 1);
- sumRows = 0;
- }
-
- auto& toMove = changes->PortionsToMove[i];
- sumRows += toMove.first.NumRows();
- toMove.second = tmpGranule;
- }
-
- tsIds.swap(newTsIds);
- }
- Y_VERIFY(tsIds.size() > 1);
- Y_VERIFY(tsIds[0] == std::make_pair(ts0, ui64(1)));
- TColumnEngineForLogs::TMarksGranules marksGranules(std::move(tsIds));
-
- // Slice inserted portions with granules' borders
- THashMap<ui64, std::vector<std::shared_ptr<arrow::RecordBatch>>> idBatches;
- TVector<TPortionInfo*> toSwitch;
- toSwitch.reserve(portions.size());
- for (size_t i = 0; i < portions.size(); ++i) {
- auto& portion = portions[i];
- auto& batch = srcBatches[i];
- auto slices = marksGranules.SliceIntoGranules(batch, indexInfo);
-
- THashSet<ui64> ids;
- for (auto& [id, slice] : slices) {
- if (slice && slice->num_rows()) {
- ids.insert(id);
- idBatches[id].emplace_back(std::move(slice));
- }
- }
-
- // Optimization: move not splitted inserted portions. Do not reappend them.
- if (ids.size() == 1) {
- ui64 id = *ids.begin();
- idBatches[id].resize(idBatches[id].size() - 1);
- ui64 tmpGranule = id - 1;
- changes->PortionsToMove.emplace_back(std::move(portion), tmpGranule);
- } else {
- toSwitch.push_back(&portion);
- }
- }
-
- // Update switchedPortions if we have moves
- if (toSwitch.size() != portions.size()) {
- TVector<TPortionInfo> tmp;
- tmp.reserve(toSwitch.size());
- for (auto* portionInfo : toSwitch) {
- tmp.emplace_back(std::move(*portionInfo));
- }
- portions.swap(tmp);
- }
-
- for (const auto& [mark, id] : marksGranules.GetOrderedMarks()) {
- ui64 tmpGranule = changes->SetTmpGranule(pathId, mark);
-
- for (const auto& batch : idBatches[id]) {
- // Cannot set snapshot here. It would be set in committing transaction in ApplyChanges().
- auto newPortions = MakeAppendedPortions(pathId, indexInfo, tieringMap, batch, tmpGranule, TSnapshot{}, blobs);
- Y_VERIFY(newPortions.size() > 0);
- for (auto& portion : newPortions) {
- changes->AppendedPortions.emplace_back(std::move(portion));
- }
- }
- }
- } else {
- auto batches = SliceGranuleBatches(indexInfo, *changes, srcBatches, ts0);
-
- changes->SetTmpGranule(pathId, ts0);
- for (auto& [ts, batch] : batches) {
- // Tmp granule would be updated to correct value in ApplyChanges()
- ui64 tmpGranule = changes->SetTmpGranule(pathId, ts);
-
- // Cannot set snapshot here. It would be set in committing transaction in ApplyChanges().
- auto portions = MakeAppendedPortions(pathId, indexInfo, tieringMap, batch, tmpGranule, TSnapshot{}, blobs);
- Y_VERIFY(portions.size() > 0);
- for (auto& portion : portions) {
- changes->AppendedPortions.emplace_back(std::move(portion));
- }
- }
- }
-
- return blobs;
-}
-
-TVector<TString> TColumnEngineForLogs::CompactBlobs(const TIndexInfo& indexInfo, const THashMap<ui64, NKikimr::NOlap::TTiering>& tieringMap,
- std::shared_ptr<TColumnEngineChanges> changes) {
- Y_VERIFY(changes);
- Y_VERIFY(changes->CompactionInfo);
- Y_VERIFY(changes->DataToIndex.empty()); // not used
- Y_VERIFY(!changes->Blobs.empty()); // src data
- Y_VERIFY(!changes->SwitchedPortions.empty()); // src meta
- Y_VERIFY(changes->AppendedPortions.empty()); // dst meta
-
- auto castedChanges = std::static_pointer_cast<TChanges>(changes);
- if (castedChanges->CompactionInfo->InGranule) {
- return CompactInGranule(indexInfo, tieringMap, castedChanges);
- }
- return CompactSplitGranule(indexInfo, tieringMap, castedChanges);
-}
-
-TVector<TString> TColumnEngineForLogs::EvictBlobs(const TIndexInfo& indexInfo, const THashMap<ui64, NKikimr::NOlap::TTiering>& tieringMap,
- std::shared_ptr<TColumnEngineChanges> changes) {
- Y_VERIFY(changes);
- Y_VERIFY(!changes->Blobs.empty()); // src data
- Y_VERIFY(!changes->PortionsToEvict.empty()); // src meta
- Y_VERIFY(changes->EvictedRecords.empty()); // dst meta
-
- TVector<TString> newBlobs;
- TVector<std::pair<TPortionInfo, TPortionEvictionFeatures>> evicted;
- evicted.reserve(changes->PortionsToEvict.size());
-
- for (auto& [portionInfo, evictFeatures] : changes->PortionsToEvict) {
- Y_VERIFY(!portionInfo.Empty());
- Y_VERIFY(portionInfo.IsActive());
-
- if (UpdateEvictedPortion(portionInfo, indexInfo, tieringMap, evictFeatures, changes->Blobs,
- changes->EvictedRecords, newBlobs)) {
- Y_VERIFY(portionInfo.TierName == evictFeatures.TargetTierName);
- evicted.emplace_back(std::move(portionInfo), evictFeatures);
- }
- }
-
- changes->PortionsToEvict.swap(evicted);
- return newBlobs;
-}
-
}
diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.h b/ydb/core/tx/columnshard/engines/column_engine_logs.h
index c23eedd1f7b..e615064f628 100644
--- a/ydb/core/tx/columnshard/engines/column_engine_logs.h
+++ b/ydb/core/tx/columnshard/engines/column_engine_logs.h
@@ -218,17 +218,6 @@ public:
std::shared_ptr<TPredicate> to) const override;
std::unique_ptr<TCompactionInfo> Compact(ui64& lastCompactedGranule) override;
- // Static part of IColumnEngine iface (called from actors). It's static cause there's no threads sync.
-
- /// @note called from IndexingActor
- static TVector<TString> IndexBlobs(const TIndexInfo& indexInfo, const THashMap<ui64, NKikimr::NOlap::TTiering>& tieringMap, std::shared_ptr<TColumnEngineChanges> changes);
-
- /// @note called from CompactionActor
- static TVector<TString> CompactBlobs(const TIndexInfo& indexInfo, const THashMap<ui64, NKikimr::NOlap::TTiering>& tieringMap, std::shared_ptr<TColumnEngineChanges> changes);
-
- /// @note called from EvictionActor
- static TVector<TString> EvictBlobs(const TIndexInfo& indexInfo, const THashMap<ui64, NKikimr::NOlap::TTiering>& tieringMap, std::shared_ptr<TColumnEngineChanges> changes);
-
private:
struct TGranuleMeta {
const TGranuleRecord Record;
diff --git a/ydb/core/tx/columnshard/engines/index_logic.cpp b/ydb/core/tx/columnshard/engines/index_logic.cpp
new file mode 100644
index 00000000000..e02dd2ebb6d
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/index_logic.cpp
@@ -0,0 +1,733 @@
+#include "index_logic.h"
+
+#include <span>
+
+namespace NKikimr::NOlap {
+
+std::shared_ptr<arrow::RecordBatch> TIndexLogicBase::GetEffectiveKey(const std::shared_ptr<arrow::RecordBatch>& batch,
+ const TIndexInfo& indexInfo) {
+ // TODO: composite effective key
+ auto columnName = indexInfo.GetPrimaryKey()[0].first;
+ auto resBatch = NArrow::ExtractColumns(batch, {std::string(columnName.data(), columnName.size())});
+ Y_VERIFY_S(resBatch, "No column '" << columnName << "' in batch " << batch->schema()->ToString());
+ return resBatch;
+}
+
+arrow::ipc::IpcWriteOptions WriteOptions(const TCompression& compression) {
+ auto& codec = compression.Codec;
+
+ arrow::ipc::IpcWriteOptions options(arrow::ipc::IpcWriteOptions::Defaults());
+ Y_VERIFY(arrow::util::Codec::IsAvailable(codec));
+ arrow::Result<std::unique_ptr<arrow::util::Codec>> resCodec;
+ if (compression.Level) {
+ resCodec = arrow::util::Codec::Create(codec, *compression.Level);
+ if (!resCodec.ok()) {
+ resCodec = arrow::util::Codec::Create(codec);
+ }
+ } else {
+ resCodec = arrow::util::Codec::Create(codec);
+ }
+ Y_VERIFY(resCodec.ok());
+
+ options.codec.reset((*resCodec).release());
+ options.use_threads = false;
+ return options;
+}
+
+std::shared_ptr<arrow::RecordBatch> TIndexationLogic::AddSpecials(const std::shared_ptr<arrow::RecordBatch>& srcBatch,
+ const TIndexInfo& indexInfo, const TInsertedData& inserted) const {
+ auto batch = TIndexInfo::AddSpecialColumns(srcBatch, inserted.PlanStep(), inserted.TxId());
+ Y_VERIFY(batch);
+
+ return NArrow::ExtractColumns(batch, indexInfo.ArrowSchemaWithSpecials());
+}
+
+bool TEvictionLogic::UpdateEvictedPortion(TPortionInfo& portionInfo,
+ TPortionEvictionFeatures& evictFeatures, const THashMap<TBlobRange, TString>& srcBlobs,
+ TVector<TColumnRecord>& evictedRecords, TVector<TString>& newBlobs) const {
+ Y_VERIFY(portionInfo.TierName != evictFeatures.TargetTierName);
+
+ auto* tiering = GetTieringMap().FindPtr(evictFeatures.PathId);
+ Y_VERIFY(tiering);
+ auto compression = tiering->GetCompression(evictFeatures.TargetTierName);
+ if (!compression) {
+ // Noting to recompress. We have no other kinds of evictions yet.
+ portionInfo.TierName = evictFeatures.TargetTierName;
+ evictFeatures.DataChanges = false;
+ return true;
+ }
+
+ Y_VERIFY(!evictFeatures.NeedExport);
+
+ auto schema = IndexInfo.ArrowSchemaWithSpecials();
+ auto batch = portionInfo.AssembleInBatch(IndexInfo, schema, srcBlobs);
+ auto writeOptions = WriteOptions(*compression);
+
+ TPortionInfo undo = portionInfo;
+ size_t undoSize = newBlobs.size();
+
+ for (auto& rec : portionInfo.Records) {
+ auto colName = IndexInfo.GetColumnName(rec.ColumnId);
+ std::string name(colName.data(), colName.size());
+ auto field = schema->GetFieldByName(name);
+
+ auto blob = TPortionInfo::SerializeColumn(batch->GetColumnByName(name), field, writeOptions);
+ if (blob.size() >= TPortionInfo::BLOB_BYTES_LIMIT) {
+ portionInfo = undo;
+ newBlobs.resize(undoSize);
+ return false;
+ }
+ newBlobs.emplace_back(std::move(blob));
+ rec.BlobRange = TBlobRange{};
+ }
+
+ for (auto& rec : undo.Records) {
+ evictedRecords.emplace_back(std::move(rec));
+ }
+
+ portionInfo.AddMetadata(IndexInfo, batch, evictFeatures.TargetTierName);
+ return true;
+}
+
+TVector<TPortionInfo> TIndexLogicBase::MakeAppendedPortions(const ui64 pathId,
+ const std::shared_ptr<arrow::RecordBatch> batch,
+ const ui64 granule,
+ const TSnapshot& minSnapshot,
+ TVector<TString>& blobs) const {
+ Y_VERIFY(batch->num_rows());
+ const auto schema = IndexInfo.ArrowSchemaWithSpecials();
+ TVector<TPortionInfo> out;
+
+ TString tierName;
+ TCompression compression = IndexInfo.GetDefaultCompression();
+ if (pathId) {
+ if (auto* tiering = GetTieringMap().FindPtr(pathId)) {
+ tierName = tiering->GetHottestTierName();
+ if (const auto& tierCompression = tiering->GetCompression(tierName)) {
+ compression = *tierCompression;
+ }
+ }
+ }
+ const auto writeOptions = WriteOptions(compression);
+
+ std::shared_ptr<arrow::RecordBatch> portionBatch = batch;
+ for (i32 pos = 0; pos < batch->num_rows();) {
+ Y_VERIFY(portionBatch->num_rows());
+
+ TPortionInfo portionInfo;
+ portionInfo.Records.reserve(schema->num_fields());
+ TVector<TString> portionBlobs;
+ portionBlobs.reserve(schema->num_fields());
+
+ // Serialize portion's columns into blobs
+
+ bool ok = true;
+ for (const auto& field : schema->fields()) {
+ const auto& name = field->name();
+ ui32 columnId = IndexInfo.GetColumnId(TString(name.data(), name.size()));
+
+ /// @warnign records are not valid cause of empty BlobId and zero Portion
+ TColumnRecord record = TColumnRecord::Make(granule, columnId, minSnapshot, 0);
+ auto blob = portionInfo.AddOneChunkColumn(portionBatch->GetColumnByName(name), field, std::move(record),
+ writeOptions);
+ if (!blob.size()) {
+ ok = false;
+ break;
+ }
+
+ // TODO: combine small columns in one blob
+ portionBlobs.emplace_back(std::move(blob));
+ }
+
+ if (ok) {
+ portionInfo.AddMetadata(IndexInfo, portionBatch, tierName);
+ out.emplace_back(std::move(portionInfo));
+ for (auto& blob : portionBlobs) {
+ blobs.push_back(blob);
+ }
+ pos += portionBatch->num_rows();
+ if (pos < batch->num_rows()) {
+ portionBatch = batch->Slice(pos);
+ }
+ } else {
+ const i64 halfLen = portionBatch->num_rows() / 2;
+ Y_VERIFY(halfLen);
+ portionBatch = batch->Slice(pos, halfLen);
+ }
+ }
+
+ return out;
+}
+
+std::vector<std::shared_ptr<arrow::RecordBatch>> TCompactionLogic::PortionsToBatches(const TVector<TPortionInfo>& portions,
+ const THashMap<TBlobRange, TString>& blobs,
+ bool insertedOnly) const {
+ // TODO: schema changes
+ const std::shared_ptr<arrow::Schema> schema = IndexInfo.ArrowSchemaWithSpecials();
+
+ std::vector<std::shared_ptr<arrow::RecordBatch>> batches;
+ batches.reserve(portions.size());
+
+ for (auto& portionInfo : portions) {
+ auto batch = portionInfo.AssembleInBatch(IndexInfo, schema, blobs);
+ if (!insertedOnly || portionInfo.IsInserted()) {
+ batches.push_back(batch);
+ }
+ }
+ return batches;
+}
+
+THashMap<ui64, std::shared_ptr<arrow::RecordBatch>> TIndexLogicBase::SliceIntoGranules(const std::shared_ptr<arrow::RecordBatch>& batch,
+ const std::vector<std::pair<TMark, ui64>>& granules,
+ const TIndexInfo& indexInfo) {
+ Y_VERIFY(batch);
+ if (batch->num_rows() == 0) {
+ return {};
+ }
+
+ THashMap<ui64, std::shared_ptr<arrow::RecordBatch>> out;
+
+ if (granules.size() == 1) {
+ out.emplace(granules[0].second, batch);
+ } else {
+ const auto effKey = GetEffectiveKey(batch, indexInfo);
+ Y_VERIFY(effKey->num_columns() && effKey->num_rows());
+
+ std::vector<NArrow::TRawReplaceKey> keys;
+ {
+ const auto& columns = effKey->columns();
+ keys.reserve(effKey->num_rows());
+ for (i64 i = 0; i < effKey->num_rows(); ++i) {
+ keys.emplace_back(NArrow::TRawReplaceKey(&columns, i));
+ }
+ }
+
+ i64 offset = 0;
+ for (size_t i = 0; i < granules.size() && offset < effKey->num_rows(); ++i) {
+ const i64 end = (i + 1 == granules.size())
+ // Just take the number of elements in the key column for the last granule.
+ ? effKey->num_rows()
+ // Locate position of the next granule in the key.
+ : NArrow::LowerBound(keys, granules[i + 1].first.Border, offset);
+
+ if (const i64 size = end - offset) {
+ Y_VERIFY(out.emplace(granules[i].second, batch->Slice(offset, size)).second);
+ }
+
+ offset = end;
+ }
+ }
+ return out;
+}
+
+TVector<TString> TIndexationLogic::Apply(std::shared_ptr<TColumnEngineChanges> indexChanges) const {
+ auto changes = std::static_pointer_cast<TColumnEngineForLogs::TChanges>(indexChanges);
+ Y_VERIFY(!changes->DataToIndex.empty());
+ Y_VERIFY(changes->AppendedPortions.empty());
+ Y_VERIFY(IndexInfo.IsSorted());
+
+ TSnapshot& minSnapshot = changes->ApplySnapshot;
+ THashMap<ui64, std::vector<std::shared_ptr<arrow::RecordBatch>>> pathBatches;
+ for (auto& inserted : changes->DataToIndex) {
+ TSnapshot insertSnap{inserted.PlanStep(), inserted.TxId()};
+ Y_VERIFY(insertSnap.Valid());
+ if (minSnapshot.IsZero() || insertSnap <= minSnapshot) {
+ minSnapshot = insertSnap;
+ }
+
+ TBlobRange blobRange(inserted.BlobId, 0, inserted.BlobId.BlobSize());
+
+ std::shared_ptr<arrow::RecordBatch> batch;
+ if (auto it = changes->CachedBlobs.find(inserted.BlobId); it != changes->CachedBlobs.end()) {
+ batch = it->second;
+ } else if (auto* blobData = changes->Blobs.FindPtr(blobRange)) {
+ Y_VERIFY(!blobData->empty(), "Blob data not present");
+ batch = NArrow::DeserializeBatch(*blobData, IndexInfo.ArrowSchema());
+ } else {
+ Y_VERIFY(blobData, "Data for range %s has not been read", blobRange.ToString().c_str());
+ }
+ Y_VERIFY(batch);
+
+ batch = AddSpecials(batch, IndexInfo, inserted);
+ pathBatches[inserted.PathId].push_back(batch);
+ Y_VERIFY_DEBUG(NArrow::IsSorted(pathBatches[inserted.PathId].back(), IndexInfo.GetReplaceKey()));
+ }
+ Y_VERIFY(minSnapshot.Valid());
+
+ TVector<TString> blobs;
+
+ for (auto& [pathId, batches] : pathBatches) {
+ changes->AddPathIfNotExists(pathId);
+
+ // We could merge data here cause tablet limits indexing data portions
+#if 0
+ auto merged = NArrow::CombineSortedBatches(batches, indexInfo.SortDescription()); // insert: no replace
+ Y_VERIFY(merged);
+ Y_VERIFY_DEBUG(NArrow::IsSorted(merged, indexInfo.GetReplaceKey()));
+#else
+ auto merged = NArrow::CombineSortedBatches(batches, IndexInfo.SortReplaceDescription());
+ Y_VERIFY(merged);
+ Y_VERIFY_DEBUG(NArrow::IsSortedAndUnique(merged, IndexInfo.GetReplaceKey()));
+
+#endif
+
+ auto granuleBatches = SliceIntoGranules(merged, changes->PathToGranule[pathId], IndexInfo);
+ for (auto& [granule, batch] : granuleBatches) {
+ auto portions = MakeAppendedPortions(pathId, batch, granule, minSnapshot, blobs);
+ Y_VERIFY(portions.size() > 0);
+ for (auto& portion : portions) {
+ changes->AppendedPortions.emplace_back(std::move(portion));
+ }
+ }
+ }
+
+ Y_VERIFY(changes->PathToGranule.size() == pathBatches.size());
+ return blobs;
+}
+
+std::shared_ptr<arrow::RecordBatch> TCompactionLogic::CompactInOneGranule(ui64 granule,
+ const TVector<TPortionInfo>& portions,
+ const THashMap<TBlobRange, TString>& blobs) const {
+ std::vector<std::shared_ptr<arrow::RecordBatch>> batches;
+ batches.reserve(portions.size());
+
+ auto schema = IndexInfo.ArrowSchemaWithSpecials();
+ for (auto& portionInfo : portions) {
+ Y_VERIFY(!portionInfo.Empty());
+ Y_VERIFY(portionInfo.Granule() == granule);
+
+ auto batch = portionInfo.AssembleInBatch(IndexInfo, schema, blobs);
+ batches.push_back(batch);
+ }
+
+ auto sortedBatch = NArrow::CombineSortedBatches(batches, IndexInfo.SortReplaceDescription());
+ Y_VERIFY_DEBUG(NArrow::IsSortedAndUnique(sortedBatch, IndexInfo.GetReplaceKey()));
+
+ return sortedBatch;
+}
+
+TVector<TString> TCompactionLogic::CompactInGranule(std::shared_ptr<TColumnEngineForLogs::TChanges> changes) const {
+ const ui64 pathId = changes->SrcGranule->PathId;
+ TVector<TString> blobs;
+ auto& switchedProtions = changes->SwitchedPortions;
+ Y_VERIFY(switchedProtions.size());
+
+ ui64 granule = switchedProtions[0].Granule();
+ auto batch = CompactInOneGranule(granule, switchedProtions, changes->Blobs);
+
+ TVector<TPortionInfo> portions;
+ if (!changes->MergeBorders.Empty()) {
+ Y_VERIFY(changes->MergeBorders.GetOrderedMarks().size() > 1);
+ auto slices = changes->MergeBorders.SliceIntoGranules(batch, IndexInfo);
+ portions.reserve(slices.size());
+
+ for (auto& [_, slice] : slices) {
+ if (!slice || slice->num_rows() == 0) {
+ continue;
+ }
+ auto tmp = MakeAppendedPortions(pathId, slice, granule, TSnapshot{}, blobs);
+ for (auto&& portionInfo : tmp) {
+ portions.emplace_back(std::move(portionInfo));
+ }
+ }
+ } else {
+ portions = MakeAppendedPortions(pathId, batch, granule, TSnapshot{}, blobs);
+ }
+
+ Y_VERIFY(portions.size() > 0);
+ for (auto& portion : portions) {
+ changes->AppendedPortions.emplace_back(std::move(portion));
+ }
+
+ return blobs;
+}
+
+TVector<std::pair<TMark, std::shared_ptr<arrow::RecordBatch>>>
+TCompactionLogic::SliceGranuleBatches(const TIndexInfo& indexInfo,
+ const TColumnEngineForLogs::TChanges& changes,
+ const std::vector<std::shared_ptr<arrow::RecordBatch>>& batches,
+ const TMark& ts0) const {
+ TVector<std::pair<TMark, std::shared_ptr<arrow::RecordBatch>>> out;
+
+ // Extract unique effective keys and their counts
+ i64 numRows = 0;
+ TMap<NArrow::TReplaceKey, ui32> uniqKeyCount;
+ for (const auto& batch : batches) {
+ Y_VERIFY(batch);
+ if (batch->num_rows() == 0) {
+ continue;
+ }
+
+ numRows += batch->num_rows();
+
+ const auto effKey = GetEffectiveKey(batch, indexInfo);
+ Y_VERIFY(effKey->num_columns() && effKey->num_rows());
+
+ auto effColumns = std::make_shared<NArrow::TArrayVec>(effKey->columns());
+ for (int row = 0; row < effKey->num_rows(); ++row) {
+ ++uniqKeyCount[NArrow::TReplaceKey(effColumns, row)];
+ }
+ }
+
+ Y_VERIFY(uniqKeyCount.size());
+ auto minTs = uniqKeyCount.begin()->first;
+ auto maxTs = uniqKeyCount.rbegin()->first;
+ Y_VERIFY(minTs >= ts0.Border);
+
+ // It's an estimation of needed count cause numRows calculated before key replaces
+ ui32 numSplitInto = changes.NumSplitInto(numRows);
+ ui32 rowsInGranule = numRows / numSplitInto;
+ Y_VERIFY(rowsInGranule);
+
+ // Cannot split in case of one unique key
+ if (uniqKeyCount.size() == 1) {
+ // We have to split big batch of same key in several portions
+ auto merged = NArrow::MergeSortedBatches(batches, indexInfo.SortReplaceDescription(), rowsInGranule);
+ for (auto& batch : merged) {
+ Y_VERIFY_DEBUG(NArrow::IsSortedAndUnique(batch, indexInfo.GetReplaceKey()));
+ out.emplace_back(ts0, batch);
+ }
+ return out;
+ }
+
+ // Make split borders from uniq keys
+ TVector<NArrow::TReplaceKey> borders;
+ borders.reserve(numRows / rowsInGranule);
+ {
+ ui32 sumRows = 0;
+ for (auto& [ts, num] : uniqKeyCount) {
+ if (sumRows >= rowsInGranule) {
+ borders.emplace_back(ts);
+ sumRows = 0;
+ }
+ sumRows += num;
+ }
+ if (borders.empty()) {
+ borders.emplace_back(maxTs); // huge trailing key
+ }
+ Y_VERIFY(borders.size());
+ }
+
+ // Find offsets in source batches
+ TVector<TVector<int>> offsets(batches.size()); // vec[batch][border] = offset
+ for (size_t i = 0; i < batches.size(); ++i) {
+ const auto& batch = batches[i];
+ auto& batchOffsets = offsets[i];
+ batchOffsets.reserve(borders.size() + 1);
+
+ const auto effKey = GetEffectiveKey(batch, indexInfo);
+ Y_VERIFY(effKey->num_columns() && effKey->num_rows());
+
+ std::vector<NArrow::TRawReplaceKey> keys;
+ {
+ const auto& columns = effKey->columns();
+ keys.reserve(effKey->num_rows());
+ for (i64 i = 0; i < effKey->num_rows(); ++i) {
+ keys.emplace_back(NArrow::TRawReplaceKey(&columns, i));
+ }
+ }
+
+ batchOffsets.push_back(0);
+ for (const auto& border : borders) {
+ int offset = NArrow::LowerBound(keys, border, batchOffsets.back());
+ Y_VERIFY(offset >= batchOffsets.back());
+ batchOffsets.push_back(offset);
+ }
+
+ Y_VERIFY(batchOffsets.size() == borders.size() + 1);
+ }
+
+ // Make merge-sorted granule batch for each splitted granule
+ for (ui32 granuleNo = 0; granuleNo < borders.size() + 1; ++granuleNo) {
+ std::vector<std::shared_ptr<arrow::RecordBatch>> granuleBatches;
+ granuleBatches.reserve(batches.size());
+
+ // Extract granule: slice source batches with offsets
+ i64 granuleNumRows = 0;
+ for (size_t i = 0; i < batches.size(); ++i) {
+ const auto& batch = batches[i];
+ auto& batchOffsets = offsets[i];
+
+ int offset = batchOffsets[granuleNo];
+ int end = batch->num_rows();
+ if (granuleNo < borders.size()) {
+ end = batchOffsets[granuleNo + 1];
+ }
+ int size = end - offset;
+ Y_VERIFY(size >= 0);
+
+ if (size) {
+ auto slice = batch->Slice(offset, size);
+ Y_VERIFY(slice->num_rows());
+ granuleNumRows += slice->num_rows();
+#if 1 // Check correctness
+ const auto effKey = GetEffectiveKey(slice, indexInfo);
+ Y_VERIFY(effKey->num_columns() && effKey->num_rows());
+
+ auto startKey = granuleNo ? borders[granuleNo - 1] : minTs;
+ Y_VERIFY(NArrow::TReplaceKey::FromBatch(effKey, 0) >= startKey);
+
+ NArrow::TReplaceKey lastSliceKey = NArrow::TReplaceKey::FromBatch(effKey, effKey->num_rows() - 1);
+ if (granuleNo < borders.size() - 1) {
+ const auto& endKey = borders[granuleNo];
+ Y_VERIFY(lastSliceKey < endKey);
+ } else {
+ Y_VERIFY(lastSliceKey <= maxTs);
+ }
+#endif
+ Y_VERIFY_DEBUG(NArrow::IsSorted(slice, indexInfo.GetReplaceKey()));
+ granuleBatches.emplace_back(slice);
+ }
+ }
+
+ // Merge slices. We have to split a big key batches in several ones here.
+ if (granuleNumRows > 4 * rowsInGranule) {
+ granuleNumRows = rowsInGranule;
+ }
+ auto merged = NArrow::MergeSortedBatches(granuleBatches, indexInfo.SortReplaceDescription(), granuleNumRows);
+ for (auto& batch : merged) {
+ Y_VERIFY_DEBUG(NArrow::IsSortedAndUnique(batch, indexInfo.GetReplaceKey()));
+
+ auto startKey = ts0.Border;
+ if (granuleNo) {
+ startKey = borders[granuleNo - 1];
+ }
+#if 1 // Check correctness
+ const auto effKey = GetEffectiveKey(batch, indexInfo);
+ Y_VERIFY(effKey->num_columns() && effKey->num_rows());
+
+ Y_VERIFY(NArrow::TReplaceKey::FromBatch(effKey, 0) >= startKey);
+#endif
+ out.emplace_back(TMark(startKey), batch);
+ }
+ }
+
+ return out;
+}
+
+ui64 TCompactionLogic::TryMovePortions(const TMark& ts0,
+ TVector<TPortionInfo>& portions,
+ std::vector<std::pair<TMark, ui64>>& tsIds,
+ TVector<std::pair<TPortionInfo, ui64>>& toMove) const {
+ std::vector<TPortionInfo*> partitioned(portions.size());
+ // Split portions by putting the inserted portions in the original order
+ // at the beginning of the buffer and the compacted portions at the end.
+ // The compacted portions will be put in the reversed order, but it will be sorted later.
+ const auto [inserted, compacted] = [&]() {
+ size_t l = 0;
+ size_t r = portions.size();
+
+ for (auto& portionInfo : portions) {
+ partitioned[(portionInfo.IsInserted() ? l++ : --r)] = &portionInfo;
+ }
+
+ return std::make_tuple(std::span(partitioned.begin(), l), std::span(partitioned.begin() + l, partitioned.end()));
+ }();
+
+ // Do nothing if there are less than two compacted protions.
+ if (compacted.size() < 2) {
+ return 0;
+ }
+ // Order compacted portions by primary key.
+ std::sort(compacted.begin(), compacted.end(), [](const TPortionInfo* a, const TPortionInfo* b) {
+ return a->EffKeyStart() < b->EffKeyStart();
+ });
+ // Check that there are no gaps between two adjacent portions in term of primary key range.
+ for (size_t i = 0; i < compacted.size() - 1; ++i) {
+ if (compacted[i]->EffKeyEnd() >= compacted[i + 1]->EffKeyStart()) {
+ return 0;
+ }
+ }
+
+ toMove.reserve(compacted.size());
+ ui64 numRows = 0;
+ ui32 counter = 0;
+ for (auto* portionInfo : compacted) {
+ ui32 rows = portionInfo->NumRows();
+ Y_VERIFY(rows);
+ numRows += rows;
+ tsIds.emplace_back((counter ? TMark(portionInfo->EffKeyStart()) : ts0), counter + 1);
+ toMove.emplace_back(std::move(*portionInfo), counter);
+ ++counter;
+ // Ensure that std::move will take an effect.
+ static_assert(std::swappable<decltype(*portionInfo)>);
+ }
+
+ std::vector<TPortionInfo> out;
+ out.reserve(inserted.size());
+ for (auto* portionInfo : inserted) {
+ out.emplace_back(std::move(*portionInfo));
+ // Ensure that std::move will take an effect.
+ static_assert(std::swappable<decltype(*portionInfo)>);
+ }
+ portions.swap(out);
+
+ return numRows;
+}
+
+TVector<TString> TCompactionLogic::CompactSplitGranule(const std::shared_ptr<TColumnEngineForLogs::TChanges>& changes) const {
+ const ui64 pathId = changes->SrcGranule->PathId;
+ const TMark ts0 = changes->SrcGranule->Mark;
+ TVector<TPortionInfo>& portions = changes->SwitchedPortions;
+
+ std::vector<std::pair<TMark, ui64>> tsIds;
+ ui64 movedRows = TryMovePortions(ts0, portions, tsIds, changes->PortionsToMove);
+ const auto& srcBatches = PortionsToBatches(portions, changes->Blobs, movedRows != 0);
+ Y_VERIFY(srcBatches.size() == portions.size());
+
+ TVector<TString> blobs;
+
+ if (movedRows) {
+ Y_VERIFY(changes->PortionsToMove.size() >= 2);
+ Y_VERIFY(changes->PortionsToMove.size() == tsIds.size());
+ Y_VERIFY(tsIds.begin()->first == ts0);
+
+ // Calculate total number of rows.
+ ui64 numRows = movedRows;
+ for (const auto& batch : srcBatches) {
+ numRows += batch->num_rows();
+ }
+
+ // Recalculate new granules borders (if they are larger then portions)
+ ui32 numSplitInto = changes->NumSplitInto(numRows);
+ if (numSplitInto < tsIds.size()) {
+ const ui32 rowsInGranule = numRows / numSplitInto;
+ Y_VERIFY(rowsInGranule);
+
+ TVector<std::pair<TMark, ui64>> newTsIds;
+ ui32 tmpGranule = 0;
+ ui32 sumRows = 0;
+ // Always insert mark of the source granule at the beginning.
+ newTsIds.emplace_back(ts0, 1);
+
+ for (size_t i = 0, end = tsIds.size(); i != end; ++i) {
+ const TMark& ts = tsIds[i].first;
+ // Make new granule if the current number of rows is exceeded the allowed number of rows in the granule
+ // or there is the end of the ids and nothing was inserted so far.
+ if (sumRows >= rowsInGranule || (i + 1 == end && newTsIds.size() == 1)) {
+ ++tmpGranule;
+ newTsIds.emplace_back(ts, tmpGranule + 1);
+ sumRows = 0;
+ }
+
+ auto& toMove = changes->PortionsToMove[i];
+ sumRows += toMove.first.NumRows();
+ toMove.second = tmpGranule;
+ }
+
+ tsIds.swap(newTsIds);
+ }
+ Y_VERIFY(tsIds.size() > 1);
+ Y_VERIFY(tsIds[0] == std::make_pair(ts0, ui64(1)));
+ TColumnEngineForLogs::TMarksGranules marksGranules(std::move(tsIds));
+
+ // Slice inserted portions with granules' borders
+ THashMap<ui64, std::vector<std::shared_ptr<arrow::RecordBatch>>> idBatches;
+ TVector<TPortionInfo*> toSwitch;
+ toSwitch.reserve(portions.size());
+ for (size_t i = 0; i < portions.size(); ++i) {
+ auto& portion = portions[i];
+ auto& batch = srcBatches[i];
+ auto slices = marksGranules.SliceIntoGranules(batch, IndexInfo);
+
+ THashSet<ui64> ids;
+ for (auto& [id, slice] : slices) {
+ if (slice && slice->num_rows()) {
+ ids.insert(id);
+ idBatches[id].emplace_back(std::move(slice));
+ }
+ }
+
+ // Optimization: move not splitted inserted portions. Do not reappend them.
+ if (ids.size() == 1) {
+ ui64 id = *ids.begin();
+ idBatches[id].resize(idBatches[id].size() - 1);
+ ui64 tmpGranule = id - 1;
+ changes->PortionsToMove.emplace_back(std::move(portion), tmpGranule);
+ } else {
+ toSwitch.push_back(&portion);
+ }
+ }
+
+ // Update switchedPortions if we have moves
+ if (toSwitch.size() != portions.size()) {
+ TVector<TPortionInfo> tmp;
+ tmp.reserve(toSwitch.size());
+ for (auto* portionInfo : toSwitch) {
+ tmp.emplace_back(std::move(*portionInfo));
+ }
+ portions.swap(tmp);
+ }
+
+ for (const auto& [mark, id] : marksGranules.GetOrderedMarks()) {
+ ui64 tmpGranule = changes->SetTmpGranule(pathId, mark);
+
+ for (const auto& batch : idBatches[id]) {
+ // Cannot set snapshot here. It would be set in committing transaction in ApplyChanges().
+ auto newPortions = MakeAppendedPortions(pathId, batch, tmpGranule, TSnapshot{}, blobs);
+ Y_VERIFY(newPortions.size() > 0);
+ for (auto& portion : newPortions) {
+ changes->AppendedPortions.emplace_back(std::move(portion));
+ }
+ }
+ }
+ } else {
+ auto batches = SliceGranuleBatches(IndexInfo, *changes, srcBatches, ts0);
+
+ changes->SetTmpGranule(pathId, ts0);
+ for (auto& [ts, batch] : batches) {
+ // Tmp granule would be updated to correct value in ApplyChanges()
+ ui64 tmpGranule = changes->SetTmpGranule(pathId, ts);
+
+ // Cannot set snapshot here. It would be set in committing transaction in ApplyChanges().
+ auto portions = MakeAppendedPortions(pathId, batch, tmpGranule, TSnapshot{}, blobs);
+ Y_VERIFY(portions.size() > 0);
+ for (auto& portion : portions) {
+ changes->AppendedPortions.emplace_back(std::move(portion));
+ }
+ }
+ }
+
+ return blobs;
+}
+
+TVector<TString> TCompactionLogic::Apply(std::shared_ptr<TColumnEngineChanges> changes) const {
+ Y_VERIFY(changes);
+ Y_VERIFY(changes->CompactionInfo);
+ Y_VERIFY(changes->DataToIndex.empty()); // not used
+ Y_VERIFY(!changes->Blobs.empty()); // src data
+ Y_VERIFY(!changes->SwitchedPortions.empty()); // src meta
+ Y_VERIFY(changes->AppendedPortions.empty()); // dst meta
+
+ auto castedChanges = std::static_pointer_cast<TColumnEngineForLogs::TChanges>(changes);
+ if (castedChanges->CompactionInfo->InGranule) {
+ return CompactInGranule(castedChanges);
+ }
+ return CompactSplitGranule(castedChanges);
+}
+
+TVector<TString> TEvictionLogic::Apply(std::shared_ptr<TColumnEngineChanges> changes) const {
+ Y_VERIFY(changes);
+ Y_VERIFY(!changes->Blobs.empty()); // src data
+ Y_VERIFY(!changes->PortionsToEvict.empty()); // src meta
+ Y_VERIFY(changes->EvictedRecords.empty()); // dst meta
+
+ TVector<TString> newBlobs;
+ TVector<std::pair<TPortionInfo, TPortionEvictionFeatures>> evicted;
+ evicted.reserve(changes->PortionsToEvict.size());
+
+ for (auto& [portionInfo, evictFeatures] : changes->PortionsToEvict) {
+ Y_VERIFY(!portionInfo.Empty());
+ Y_VERIFY(portionInfo.IsActive());
+
+ if (UpdateEvictedPortion(portionInfo, evictFeatures, changes->Blobs,
+ changes->EvictedRecords, newBlobs)) {
+ Y_VERIFY(portionInfo.TierName == evictFeatures.TargetTierName);
+ evicted.emplace_back(std::move(portionInfo), evictFeatures);
+ }
+ }
+
+ changes->PortionsToEvict.swap(evicted);
+ return newBlobs;
+}
+}
diff --git a/ydb/core/tx/columnshard/engines/index_logic.h b/ydb/core/tx/columnshard/engines/index_logic.h
new file mode 100644
index 00000000000..86158ca4cfc
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/index_logic.h
@@ -0,0 +1,109 @@
+#pragma once
+
+#include "defs.h"
+#include "portion_info.h"
+#include "column_engine_logs.h"
+
+namespace NKikimr::NOlap {
+
+class TIndexLogicBase {
+protected:
+ const TIndexInfo& IndexInfo;
+private:
+ const THashMap<ui64, NKikimr::NOlap::TTiering>* TieringMap = nullptr;
+
+public:
+ TIndexLogicBase(const TIndexInfo& indexInfo, const THashMap<ui64, NKikimr::NOlap::TTiering>& tieringMap)
+ : IndexInfo(indexInfo)
+ , TieringMap(&tieringMap)
+ {
+ }
+
+ TIndexLogicBase(const TIndexInfo& indexInfo)
+ : IndexInfo(indexInfo) {
+ }
+
+ virtual ~TIndexLogicBase() {
+ }
+ virtual TVector<TString> Apply(std::shared_ptr<TColumnEngineChanges> indexChanges) const = 0;
+
+ static THashMap<ui64, std::shared_ptr<arrow::RecordBatch>> SliceIntoGranules(const std::shared_ptr<arrow::RecordBatch>& batch,
+ const std::vector<std::pair<TMark, ui64>>& granules,
+ const TIndexInfo& indexInfo);
+
+protected:
+ TVector<TPortionInfo> MakeAppendedPortions(const ui64 pathId,
+ const std::shared_ptr<arrow::RecordBatch> batch,
+ const ui64 granule,
+ const TSnapshot& minSnapshot,
+ TVector<TString>& blobs) const;
+
+ static std::shared_ptr<arrow::RecordBatch> GetEffectiveKey(const std::shared_ptr<arrow::RecordBatch>& batch,
+ const TIndexInfo& indexInfo);
+
+ const THashMap<ui64, NKikimr::NOlap::TTiering>& GetTieringMap() const {
+ if (TieringMap) {
+ return *TieringMap;
+ }
+ return Default<THashMap<ui64, NKikimr::NOlap::TTiering>>();
+ }
+};
+
+class TIndexationLogic: public TIndexLogicBase {
+public:
+ using TIndexLogicBase::TIndexLogicBase;
+
+ TVector<TString> Apply(std::shared_ptr<TColumnEngineChanges> indexChanges) const override;
+
+private:
+ // Although source batches are ordered only by PK (sorting key) resulting pathBatches are ordered by extended key.
+ // They have const snapshot columns that do not break sorting inside batch.
+ std::shared_ptr<arrow::RecordBatch> AddSpecials(const std::shared_ptr<arrow::RecordBatch>& srcBatch,
+ const TIndexInfo& indexInfo, const TInsertedData& inserted) const;
+};
+
+class TCompactionLogic: public TIndexLogicBase {
+public:
+ using TIndexLogicBase::TIndexLogicBase;
+
+ TVector<TString> Apply(std::shared_ptr<TColumnEngineChanges> indexChanges) const override;
+
+private:
+ TVector<TString> CompactSplitGranule(const std::shared_ptr<TColumnEngineForLogs::TChanges>& changes) const;
+ TVector<TString> CompactInGranule(std::shared_ptr<TColumnEngineForLogs::TChanges> changes) const;
+ std::shared_ptr<arrow::RecordBatch> CompactInOneGranule(ui64 granule, const TVector<TPortionInfo>& portions, const THashMap<TBlobRange, TString>& blobs) const;
+
+ /// @return vec({ts, batch}). ts0 <= ts1 <= ... <= tsN
+ /// @note We use ts from PK for split but there could be lots PK with the same ts.
+ TVector<std::pair<TMark, std::shared_ptr<arrow::RecordBatch>>>
+ SliceGranuleBatches(const TIndexInfo& indexInfo,
+ const TColumnEngineForLogs::TChanges& changes,
+ const std::vector<std::shared_ptr<arrow::RecordBatch>>& batches,
+ const TMark& ts0) const;
+
+ /// @param[in,out] portions unchanged or only inserted portions in the same orders
+ /// @param[in,out] tsIds unchanged or marks from compacted portions ordered by mark
+ /// @param[in,out] toMove unchanged or compacted portions ordered by primary key
+ ui64 TryMovePortions(const TMark& ts0,
+ TVector<TPortionInfo>& portions,
+ std::vector<std::pair<TMark, ui64>>& tsIds,
+ TVector<std::pair<TPortionInfo, ui64>>& toMove) const;
+
+ std::vector<std::shared_ptr<arrow::RecordBatch>> PortionsToBatches(const TVector<TPortionInfo>& portions,
+ const THashMap<TBlobRange, TString>& blobs,
+ bool insertedOnly = false) const;
+};
+
+class TEvictionLogic: public TIndexLogicBase {
+public:
+ using TIndexLogicBase::TIndexLogicBase;
+
+ TVector<TString> Apply(std::shared_ptr<TColumnEngineChanges> indexChanges) const override;
+
+private:
+ bool UpdateEvictedPortion(TPortionInfo& portionInfo,
+ TPortionEvictionFeatures& evictFeatures, const THashMap<TBlobRange, TString>& srcBlobs,
+ TVector<TColumnRecord>& evictedRecords, TVector<TString>& newBlobs) const;
+};
+
+}
diff --git a/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp b/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp
index 740c3766530..c371f446aaf 100644
--- a/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp
+++ b/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp
@@ -1,5 +1,6 @@
#include <library/cpp/testing/unittest/registar.h>
#include "column_engine_logs.h"
+#include "index_logic.h"
#include "predicate.h"
@@ -279,8 +280,8 @@ bool Insert(TColumnEngineForLogs& engine, TTestDbWrapper& db, TSnapshot snap,
changes->Blobs.insert(blobs.begin(), blobs.end());
- TVector<TString> newBlobs = TColumnEngineForLogs::IndexBlobs(engine.GetIndexInfo(), {}, changes);
-
+ TIndexationLogic logic(engine.GetIndexInfo());
+ TVector<TString> newBlobs = logic.Apply(changes);
UNIT_ASSERT_VALUES_EQUAL(changes->AppendedPortions.size(), 1);
UNIT_ASSERT_VALUES_EQUAL(newBlobs.size(), testColumns.size() + 2); // add 2 columns: planStep, txId
@@ -314,8 +315,8 @@ bool Compact(TColumnEngineForLogs& engine, TTestDbWrapper& db, TSnapshot snap, T
UNIT_ASSERT_VALUES_EQUAL(changes->SwitchedPortions.size(), expected.SrcPortions);
changes->SetBlobs(std::move(blobs));
- TVector<TString> newBlobs = TColumnEngineForLogs::CompactBlobs(engine.GetIndexInfo(), {}, changes);
-
+ TCompactionLogic logic(engine.GetIndexInfo());
+ TVector<TString> newBlobs = logic.Apply(changes);
UNIT_ASSERT_VALUES_EQUAL(changes->AppendedPortions.size(), expected.NewPortions);
AddIdsToBlobs(newBlobs, changes->AppendedPortions, changes->Blobs, step);
diff --git a/ydb/core/tx/columnshard/eviction_actor.cpp b/ydb/core/tx/columnshard/eviction_actor.cpp
index 25d4a3f9f35..b5af4486629 100644
--- a/ydb/core/tx/columnshard/eviction_actor.cpp
+++ b/ydb/core/tx/columnshard/eviction_actor.cpp
@@ -1,5 +1,6 @@
#include "columnshard_impl.h"
#include <ydb/core/tx/columnshard/engines/column_engine_logs.h>
+#include <ydb/core/tx/columnshard/engines/index_logic.h>
#include "blob_cache.h"
namespace NKikimr::NColumnShard {
@@ -126,8 +127,9 @@ private:
TCpuGuard guard(TxEvent->ResourceUsage);
TxEvent->IndexChanges->SetBlobs(std::move(Blobs));
-
- TxEvent->Blobs = NOlap::TColumnEngineForLogs::EvictBlobs(TxEvent->IndexInfo, TxEvent->Tiering, TxEvent->IndexChanges);
+ NOlap::TEvictionLogic evictionLogic(TxEvent->IndexInfo, TxEvent->Tiering);
+ TxEvent->Blobs = evictionLogic.Apply(TxEvent->IndexChanges);
+
if (TxEvent->Blobs.empty()) {
TxEvent->PutStatus = NKikimrProto::OK;
}
diff --git a/ydb/core/tx/columnshard/indexing_actor.cpp b/ydb/core/tx/columnshard/indexing_actor.cpp
index 038cdc9e9d3..29f574f1805 100644
--- a/ydb/core/tx/columnshard/indexing_actor.cpp
+++ b/ydb/core/tx/columnshard/indexing_actor.cpp
@@ -1,5 +1,6 @@
#include "columnshard_impl.h"
#include <ydb/core/tx/columnshard/engines/column_engine_logs.h>
+#include <ydb/core/tx/columnshard/engines/index_logic.h>
#include "blob_cache.h"
namespace NKikimr::NColumnShard {
@@ -123,8 +124,8 @@ private:
LOG_S_DEBUG("Indexing started at tablet " << TabletId);
TCpuGuard guard(TxEvent->ResourceUsage);
- TxEvent->Blobs = NOlap::TColumnEngineForLogs::IndexBlobs(TxEvent->IndexInfo, TxEvent->Tiering, TxEvent->IndexChanges);
-
+ NOlap::TIndexationLogic indexationLogic(TxEvent->IndexInfo, TxEvent->Tiering);
+ TxEvent->Blobs = indexationLogic.Apply(TxEvent->IndexChanges);
LOG_S_DEBUG("Indexing finished at tablet " << TabletId);
} else {
LOG_S_ERROR("Indexing failed at tablet " << TabletId);