summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov333 <[email protected]>2024-04-15 16:12:59 +0300
committerGitHub <[email protected]>2024-04-15 16:12:59 +0300
commit5a5d4cca3670fcdc555c7fa5744145787ed7030f (patch)
tree9b64f997165abfcb49a27d01fc769907c5438f2a
parent260fcedc15b1ff31294e9dffccd6a0a679a80aa8 (diff)
split portion as object and portion constructor. same with portion meta (#3717)
-rw-r--r--ydb/core/protos/config.proto1
-rw-r--r--ydb/core/tx/columnshard/blobs_action/transaction/tx_write_index.cpp11
-rw-r--r--ydb/core/tx/columnshard/engines/changes/abstract/abstract.cpp4
-rw-r--r--ydb/core/tx/columnshard/engines/changes/abstract/abstract.h1
-rw-r--r--ydb/core/tx/columnshard/engines/changes/actualization/construction/context.cpp4
-rw-r--r--ydb/core/tx/columnshard/engines/changes/compaction.cpp4
-rw-r--r--ydb/core/tx/columnshard/engines/changes/general_compaction.cpp5
-rw-r--r--ydb/core/tx/columnshard/engines/changes/ttl.cpp7
-rw-r--r--ydb/core/tx/columnshard/engines/changes/ttl.h1
-rw-r--r--ydb/core/tx/columnshard/engines/changes/with_appended.cpp32
-rw-r--r--ydb/core/tx/columnshard/engines/changes/with_appended.h22
-rw-r--r--ydb/core/tx/columnshard/engines/column_engine.cpp34
-rw-r--r--ydb/core/tx/columnshard/engines/column_engine.h41
-rw-r--r--ydb/core/tx/columnshard/engines/column_engine_logs.cpp53
-rw-r--r--ydb/core/tx/columnshard/engines/column_engine_logs.h4
-rw-r--r--ydb/core/tx/columnshard/engines/db_wrapper.cpp29
-rw-r--r--ydb/core/tx/columnshard/engines/db_wrapper.h9
-rw-r--r--ydb/core/tx/columnshard/engines/portions/column_record.h2
-rw-r--r--ydb/core/tx/columnshard/engines/portions/constructor.cpp87
-rw-r--r--ydb/core/tx/columnshard/engines/portions/constructor.h333
-rw-r--r--ydb/core/tx/columnshard/engines/portions/constructor_meta.cpp105
-rw-r--r--ydb/core/tx/columnshard/engines/portions/constructor_meta.h52
-rw-r--r--ydb/core/tx/columnshard/engines/portions/meta.cpp96
-rw-r--r--ydb/core/tx/columnshard/engines/portions/meta.h54
-rw-r--r--ydb/core/tx/columnshard/engines/portions/portion_info.cpp93
-rw-r--r--ydb/core/tx/columnshard/engines/portions/portion_info.h151
-rw-r--r--ydb/core/tx/columnshard/engines/portions/read_with_blobs.cpp18
-rw-r--r--ydb/core/tx/columnshard/engines/portions/read_with_blobs.h10
-rw-r--r--ydb/core/tx/columnshard/engines/portions/write_with_blobs.cpp24
-rw-r--r--ydb/core/tx/columnshard/engines/portions/write_with_blobs.h52
-rw-r--r--ydb/core/tx/columnshard/engines/portions/ya.make2
-rw-r--r--ydb/core/tx/columnshard/engines/reader/plain_reader/constructor/read_metadata.h3
-rw-r--r--ydb/core/tx/columnshard/engines/scheme/indexes/abstract/meta.cpp3
-rw-r--r--ydb/core/tx/columnshard/engines/scheme/indexes/abstract/meta.h2
-rw-r--r--ydb/core/tx/columnshard/engines/storage/chunks/data.cpp3
-rw-r--r--ydb/core/tx/columnshard/engines/storage/chunks/data.h2
-rw-r--r--ydb/core/tx/columnshard/engines/storage/granule.cpp23
-rw-r--r--ydb/core/tx/columnshard/engines/storage/granule.h5
-rw-r--r--ydb/core/tx/columnshard/engines/storage/optimizer/abstract/optimizer.h8
-rw-r--r--ydb/core/tx/columnshard/engines/ut/ut_insert_table.cpp4
-rw-r--r--ydb/core/tx/columnshard/engines/ut/ut_logs_engine.cpp52
-rw-r--r--ydb/core/tx/columnshard/normalizer/portion/min_max.cpp50
-rw-r--r--ydb/core/tx/columnshard/splitter/abstract/chunks.h5
-rw-r--r--ydb/core/tx/columnshard/splitter/chunks.cpp3
-rw-r--r--ydb/core/tx/columnshard/splitter/chunks.h2
-rw-r--r--ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp4
46 files changed, 925 insertions, 585 deletions
diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto
index 2706be6a264..a6385e5c72a 100644
--- a/ydb/core/protos/config.proto
+++ b/ydb/core/protos/config.proto
@@ -1391,7 +1391,6 @@ message TColumnShardConfig {
}
optional TTablesStorageLayoutPolicy TablesStorageLayoutPolicy = 1;
optional bool DisabledOnSchemeShard = 2 [default = true];
- optional bool SkipOldGranules = 3 [default = true];
optional bool IndexationEnabled = 4 [default = true];
optional bool CompactionEnabled = 5 [default = true];
optional bool TTLEnabled = 6 [default = true];
diff --git a/ydb/core/tx/columnshard/blobs_action/transaction/tx_write_index.cpp b/ydb/core/tx/columnshard/blobs_action/transaction/tx_write_index.cpp
index 977adb585ea..750ffefa239 100644
--- a/ydb/core/tx/columnshard/blobs_action/transaction/tx_write_index.cpp
+++ b/ydb/core/tx/columnshard/blobs_action/transaction/tx_write_index.cpp
@@ -21,7 +21,7 @@ bool TTxWriteIndex::Execute(TTransactionContext& txc, const TActorContext& ctx)
TBlobGroupSelector dsGroupSelector(Self->Info());
NOlap::TDbWrapper dbWrap(txc.DB, &dsGroupSelector);
- AFL_VERIFY(Self->TablesManager.MutablePrimaryIndex().ApplyChanges(dbWrap, changes, snapshot));
+ AFL_VERIFY(Self->TablesManager.MutablePrimaryIndex().ApplyChangesOnExecute(dbWrap, changes, snapshot));
LOG_S_DEBUG(TxPrefix() << "(" << changes->TypeString() << ") apply" << TxSuffix());
NOlap::TWriteIndexContext context(&txc.DB, dbWrap, Self->MutableIndexAs<NOlap::TColumnEngineForLogs>());
changes->WriteIndexOnExecute(Self, context);
@@ -35,10 +35,8 @@ bool TTxWriteIndex::Execute(TTransactionContext& txc, const TActorContext& ctx)
NOlap::TBlobManagerDb blobsDb(txc.DB);
changes->MutableBlobsAction().OnExecuteTxAfterAction(*Self, blobsDb, false);
for (ui32 i = 0; i < changes->GetWritePortionsCount(); ++i) {
- auto& portion = changes->GetWritePortionInfo(i)->GetPortionInfo();
- for (auto&& i : portion.Records) {
- LOG_S_WARN(TxPrefix() << "(" << changes->TypeString() << ":" << portion.RestoreBlobRange(i.BlobRange) << ") blob cannot apply changes: " << TxSuffix());
- }
+ auto& portion = changes->GetWritePortionInfo(i)->GetPortionResult();
+ LOG_S_WARN(TxPrefix() << "(" << changes->TypeString() << ":" << portion.DebugString() << ") blob cannot apply changes: " << TxSuffix());
}
NOlap::TChangesFinishContext context("cannot write index blobs");
changes->Abort(*Self, context);
@@ -83,6 +81,9 @@ TTxWriteIndex::TTxWriteIndex(TColumnShard* self, TEvPrivate::TEvWriteIndex::TPtr
, Ev(ev)
, TabletTxNo(++Self->TabletTxCounter)
{
+ NOlap::TSnapshot snapshot(Self->LastPlannedStep, Self->LastPlannedTxId);
+ auto changes = Ev->Get()->IndexChanges;
+ AFL_VERIFY(Self->TablesManager.MutablePrimaryIndex().ApplyChangesOnTxCreate(changes, snapshot));
Y_ABORT_UNLESS(Ev && Ev->Get()->IndexChanges);
}
diff --git a/ydb/core/tx/columnshard/engines/changes/abstract/abstract.cpp b/ydb/core/tx/columnshard/engines/changes/abstract/abstract.cpp
index 295f182216c..c3496e54aee 100644
--- a/ydb/core/tx/columnshard/engines/changes/abstract/abstract.cpp
+++ b/ydb/core/tx/columnshard/engines/changes/abstract/abstract.cpp
@@ -53,12 +53,10 @@ void TColumnEngineChanges::WriteIndexOnComplete(NColumnShard::TColumnShard* self
void TColumnEngineChanges::Compile(TFinalizationContext& context) noexcept {
Y_ABORT_UNLESS(Stage != EStage::Aborted);
- if ((ui32)Stage >= (ui32)EStage::Compiled) {
- return;
- }
Y_ABORT_UNLESS(Stage == EStage::Constructed);
DoCompile(context);
+ DoOnAfterCompile();
Stage = EStage::Compiled;
}
diff --git a/ydb/core/tx/columnshard/engines/changes/abstract/abstract.h b/ydb/core/tx/columnshard/engines/changes/abstract/abstract.h
index 339face80d3..d65cdc755f4 100644
--- a/ydb/core/tx/columnshard/engines/changes/abstract/abstract.h
+++ b/ydb/core/tx/columnshard/engines/changes/abstract/abstract.h
@@ -201,6 +201,7 @@ private:
protected:
virtual void DoDebugString(TStringOutput& out) const = 0;
virtual void DoCompile(TFinalizationContext& context) = 0;
+ virtual void DoOnAfterCompile() {}
virtual void DoWriteIndexOnExecute(NColumnShard::TColumnShard* self, TWriteIndexContext& context) = 0;
virtual void DoWriteIndexOnComplete(NColumnShard::TColumnShard* self, TWriteIndexCompleteContext& context) = 0;
virtual void DoOnFinish(NColumnShard::TColumnShard& self, TChangesFinishContext& context) = 0;
diff --git a/ydb/core/tx/columnshard/engines/changes/actualization/construction/context.cpp b/ydb/core/tx/columnshard/engines/changes/actualization/construction/context.cpp
index 6b1ea617f21..919abdfa967 100644
--- a/ydb/core/tx/columnshard/engines/changes/actualization/construction/context.cpp
+++ b/ydb/core/tx/columnshard/engines/changes/actualization/construction/context.cpp
@@ -56,7 +56,7 @@ bool TTieringProcessContext::AddPortion(const TPortionInfo& info, TPortionEvicti
if (features.GetTargetTierName() == NTiering::NCommon::DeleteTierName) {
AFL_VERIFY(dWait);
Counters.OnPortionToDrop(info.GetTotalBlobBytes(), *dWait);
- it->second.back().GetTask()->PortionsToRemove.emplace(info.GetAddress(), info);
+ it->second.back().GetTask()->AddPortionToRemove(info);
AFL_VERIFY(!it->second.back().GetTask()->GetPortionsToEvictCount())("rw", features.GetRWAddress().DebugString())("f", it->first.DebugString());
} else {
if (!dWait) {
@@ -65,7 +65,7 @@ bool TTieringProcessContext::AddPortion(const TPortionInfo& info, TPortionEvicti
Counters.OnPortionToEvict(info.GetTotalBlobBytes(), *dWait);
}
it->second.back().GetTask()->AddPortionToEvict(info, std::move(features));
- AFL_VERIFY(it->second.back().GetTask()->PortionsToRemove.empty())("rw", features.GetRWAddress().DebugString())("f", it->first.DebugString());
+ AFL_VERIFY(!it->second.back().GetTask()->HasPortionsToRemove())("rw", features.GetRWAddress().DebugString())("f", it->first.DebugString());
}
return true;
}
diff --git a/ydb/core/tx/columnshard/engines/changes/compaction.cpp b/ydb/core/tx/columnshard/engines/changes/compaction.cpp
index 24e1e8c5c0e..a94d160158e 100644
--- a/ydb/core/tx/columnshard/engines/changes/compaction.cpp
+++ b/ydb/core/tx/columnshard/engines/changes/compaction.cpp
@@ -23,7 +23,7 @@ void TCompactColumnEngineChanges::DoCompile(TFinalizationContext& context) {
const TPortionMeta::EProduced producedClassResultCompaction = GetResultProducedClass();
for (auto& portionInfo : AppendedPortions) {
- portionInfo.GetPortionInfo().UpdateRecordsMeta(producedClassResultCompaction);
+ portionInfo.GetPortionConstructor().MutableMeta().UpdateRecordsMeta(producedClassResultCompaction);
}
}
@@ -77,7 +77,7 @@ TCompactColumnEngineChanges::TCompactColumnEngineChanges(std::shared_ptr<TGranul
for (const auto& portionInfo : portions) {
Y_ABORT_UNLESS(!portionInfo->HasRemoveSnapshot());
SwitchedPortions.emplace_back(*portionInfo);
- AFL_VERIFY(PortionsToRemove.emplace(portionInfo->GetAddress(), *portionInfo).second);
+ AddPortionToRemove(*portionInfo);
Y_ABORT_UNLESS(portionInfo->GetPathId() == GranuleMeta->GetPathId());
}
Y_ABORT_UNLESS(SwitchedPortions.size());
diff --git a/ydb/core/tx/columnshard/engines/changes/general_compaction.cpp b/ydb/core/tx/columnshard/engines/changes/general_compaction.cpp
index 706a0924c5c..3f5e692df4a 100644
--- a/ydb/core/tx/columnshard/engines/changes/general_compaction.cpp
+++ b/ydb/core/tx/columnshard/engines/changes/general_compaction.cpp
@@ -200,7 +200,8 @@ void TGeneralCompactColumnEngineChanges::BuildAppendedPortionsByChunks(TConstruc
AppendedPortions.back().FillStatistics(resultSchema->GetIndexInfo());
NArrow::TFirstLastSpecialKeys primaryKeys(slice.GetFirstLastPKBatch(resultSchema->GetIndexInfo().GetReplaceKey()));
NArrow::TMinMaxSpecialKeys snapshotKeys(b, TIndexInfo::ArrowSchemaSnapshot());
- AppendedPortions.back().GetPortionInfo().AddMetadata(*resultSchema, primaryKeys, snapshotKeys, IStoragesManager::DefaultStorageId);
+ AppendedPortions.back().GetPortionConstructor().AddMetadata(*resultSchema, primaryKeys, snapshotKeys);
+ AppendedPortions.back().GetPortionConstructor().MutableMeta().SetTierName(IStoragesManager::DefaultStorageId);
recordIdx += slice.GetRecordsCount();
}
}
@@ -245,7 +246,7 @@ TConclusionStatus TGeneralCompactColumnEngineChanges::DoConstructBlobs(TConstruc
TStringBuilder sbAppended;
for (auto&& p : AppendedPortions) {
- sbAppended << p.GetPortionInfo().DebugString() << ";";
+ sbAppended << p.GetPortionConstructor().DebugString() << ";";
}
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "blobs_created_diff")("appended", sbAppended)("switched", sbSwitched);
}
diff --git a/ydb/core/tx/columnshard/engines/changes/ttl.cpp b/ydb/core/tx/columnshard/engines/changes/ttl.cpp
index c2107a3515d..f23c799e331 100644
--- a/ydb/core/tx/columnshard/engines/changes/ttl.cpp
+++ b/ydb/core/tx/columnshard/engines/changes/ttl.cpp
@@ -14,7 +14,7 @@ void TTTLColumnEngineChanges::DoDebugString(TStringOutput& out) const {
}
void TTTLColumnEngineChanges::DoStart(NColumnShard::TColumnShard& self) {
- Y_ABORT_UNLESS(PortionsToEvict.size() || PortionsToRemove.size());
+ Y_ABORT_UNLESS(PortionsToEvict.size() || HasPortionsToRemove());
THashMap<TString, THashSet<TBlobRange>> blobRanges;
auto& engine = self.MutableIndexAs<TColumnEngineForLogs>();
auto& index = engine.GetVersionedIndex();
@@ -39,7 +39,7 @@ void TTTLColumnEngineChanges::DoOnFinish(NColumnShard::TColumnShard& self, TChan
for (auto&& i : PortionsToEvict) {
AFL_VERIFY(restoreIndexAddresses[i.GetPortionInfo().GetPathId()].emplace(i.GetPortionInfo().GetPortionId()).second);
}
- for (auto&& i : PortionsToRemove) {
+ for (auto&& i : GetPortionsToRemove()) {
AFL_VERIFY(restoreIndexAddresses[i.first.GetPathId()].emplace(i.first.GetPortionId()).second);
}
engine.ReturnToIndexes(restoreIndexAddresses);
@@ -66,8 +66,7 @@ NKikimr::TConclusionStatus TTTLColumnEngineChanges::DoConstructBlobs(TConstructi
for (auto&& info : PortionsToEvict) {
if (auto pwb = UpdateEvictedPortion(info, Blobs, context)) {
- info.MutablePortionInfo().SetRemoveSnapshot(context.LastCommittedTx);
- AFL_VERIFY(PortionsToRemove.emplace(info.GetPortionInfo().GetAddress(), info.GetPortionInfo()).second);
+ AddPortionToRemove(info.GetPortionInfo());
AppendedPortions.emplace_back(std::move(*pwb));
}
}
diff --git a/ydb/core/tx/columnshard/engines/changes/ttl.h b/ydb/core/tx/columnshard/engines/changes/ttl.h
index 27d76b750dc..92eb0ffa9b3 100644
--- a/ydb/core/tx/columnshard/engines/changes/ttl.h
+++ b/ydb/core/tx/columnshard/engines/changes/ttl.h
@@ -94,7 +94,6 @@ public:
ui32 GetPortionsToEvictCount() const {
return PortionsToEvict.size();
}
-
void AddPortionToEvict(const TPortionInfo& info, TPortionEvictionFeatures&& features) {
Y_ABORT_UNLESS(!info.Empty());
Y_ABORT_UNLESS(!info.HasRemoveSnapshot());
diff --git a/ydb/core/tx/columnshard/engines/changes/with_appended.cpp b/ydb/core/tx/columnshard/engines/changes/with_appended.cpp
index ba99045e723..4b759675c17 100644
--- a/ydb/core/tx/columnshard/engines/changes/with_appended.cpp
+++ b/ydb/core/tx/columnshard/engines/changes/with_appended.cpp
@@ -17,7 +17,7 @@ void TChangesWithAppend::DoWriteIndexOnExecute(NColumnShard::TColumnShard* self,
portionInfo.SaveToDatabase(context.DBWrapper, schemaPtr->GetIndexInfo().GetPKFirstColumnId(), true);
}
const auto predRemoveDroppedTable = [self](const TWritePortionInfoWithBlobs& item) {
- auto& portionInfo = item.GetPortionInfo();
+ auto& portionInfo = item.GetPortionResult();
if (!!self && (!self->TablesManager.HasTable(portionInfo.GetPathId()) || self->TablesManager.GetTable(portionInfo.GetPathId()).IsDropped())) {
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "skip_inserted_data")("reason", "table_removed")("path_id", portionInfo.GetPathId());
return true;
@@ -27,8 +27,7 @@ void TChangesWithAppend::DoWriteIndexOnExecute(NColumnShard::TColumnShard* self,
};
AppendedPortions.erase(std::remove_if(AppendedPortions.begin(), AppendedPortions.end(), predRemoveDroppedTable), AppendedPortions.end());
for (auto& portionInfoWithBlobs : AppendedPortions) {
- auto& portionInfo = portionInfoWithBlobs.GetPortionInfo();
- Y_ABORT_UNLESS(!portionInfo.Empty());
+ auto& portionInfo = portionInfoWithBlobs.GetPortionResult();
AFL_VERIFY(usedPortionIds.emplace(portionInfo.GetPortionId()).second)("portion_info", portionInfo.DebugString(true));
portionInfo.SaveToDatabase(context.DBWrapper, schemaPtr->GetIndexInfo().GetPKFirstColumnId(), false);
}
@@ -36,8 +35,9 @@ void TChangesWithAppend::DoWriteIndexOnExecute(NColumnShard::TColumnShard* self,
void TChangesWithAppend::DoWriteIndexOnComplete(NColumnShard::TColumnShard* self, TWriteIndexCompleteContext& context) {
if (self) {
- for (auto& portionInfo : AppendedPortions) {
- switch (portionInfo.GetPortionInfo().GetMeta().Produced) {
+ for (auto& portionBuilder : AppendedPortions) {
+ auto& portionInfo = portionBuilder.GetPortionResult();
+ switch (portionInfo.GetMeta().Produced) {
case NOlap::TPortionMeta::EProduced::UNSPECIFIED:
Y_ABORT_UNLESS(false); // unexpected
case NOlap::TPortionMeta::EProduced::INSERTED:
@@ -79,22 +79,25 @@ void TChangesWithAppend::DoWriteIndexOnComplete(NColumnShard::TColumnShard* self
const TPortionInfo& oldInfo = context.EngineLogs.GetGranuleVerified(portionInfo.GetPathId()).GetPortionVerified(portionInfo.GetPortion());
context.EngineLogs.UpsertPortion(portionInfo, &oldInfo);
}
- for (auto& portionInfoWithBlobs : AppendedPortions) {
- auto& portionInfo = portionInfoWithBlobs.GetPortionInfo();
- context.EngineLogs.UpsertPortion(portionInfo);
+ for (auto& portionBuilder : AppendedPortions) {
+ context.EngineLogs.UpsertPortion(portionBuilder.GetPortionResult());
}
}
}
void TChangesWithAppend::DoCompile(TFinalizationContext& context) {
for (auto&& i : AppendedPortions) {
- i.GetPortionInfo().SetPortion(context.NextPortionId());
- i.GetPortionInfo().UpdateRecordsMeta(TPortionMeta::EProduced::INSERTED);
+ i.GetPortionConstructor().SetPortionId(context.NextPortionId());
+ i.GetPortionConstructor().MutableMeta().UpdateRecordsMeta(TPortionMeta::EProduced::INSERTED);
}
for (auto& [_, portionInfo] : PortionsToRemove) {
- if (!portionInfo.HasRemoveSnapshot()) {
- portionInfo.SetRemoveSnapshot(context.GetSnapshot());
- }
+ portionInfo.SetRemoveSnapshot(context.GetSnapshot());
+ }
+}
+
+void TChangesWithAppend::DoOnAfterCompile() {
+ for (auto&& i : AppendedPortions) {
+ i.FinalizePortionConstructor();
}
}
@@ -134,7 +137,8 @@ std::vector<TWritePortionInfoWithBlobs> TChangesWithAppend::MakeAppendedPortions
out.back().FillStatistics(resultSchema->GetIndexInfo());
NArrow::TFirstLastSpecialKeys primaryKeys(slice.GetFirstLastPKBatch(resultSchema->GetIndexInfo().GetReplaceKey()));
NArrow::TMinMaxSpecialKeys snapshotKeys(b, TIndexInfo::ArrowSchemaSnapshot());
- out.back().GetPortionInfo().AddMetadata(*resultSchema, primaryKeys, snapshotKeys, IStoragesManager::DefaultStorageId);
+ out.back().GetPortionConstructor().AddMetadata(*resultSchema, b);
+ out.back().GetPortionConstructor().MutableMeta().SetTierName(IStoragesManager::DefaultStorageId);
recordIdx += slice.GetRecordsCount();
}
}
diff --git a/ydb/core/tx/columnshard/engines/changes/with_appended.h b/ydb/core/tx/columnshard/engines/changes/with_appended.h
index 1a99a11135f..4c5fbc2189c 100644
--- a/ydb/core/tx/columnshard/engines/changes/with_appended.h
+++ b/ydb/core/tx/columnshard/engines/changes/with_appended.h
@@ -9,10 +9,11 @@ namespace NKikimr::NOlap {
class TChangesWithAppend: public TColumnEngineChanges {
private:
using TBase = TColumnEngineChanges;
-
+ THashMap<TPortionAddress, TPortionInfo> PortionsToRemove;
protected:
TSaverContext SaverContext;
virtual void DoCompile(TFinalizationContext& context) override;
+ virtual void DoOnAfterCompile() override;
virtual void DoWriteIndexOnExecute(NColumnShard::TColumnShard* self, TWriteIndexContext& context) override;
virtual void DoWriteIndexOnComplete(NColumnShard::TColumnShard* self, TWriteIndexCompleteContext& context) override;
virtual void DoStart(NColumnShard::TColumnShard& self) override;
@@ -35,7 +36,6 @@ protected:
return selfLock;
}
}
-
public:
TChangesWithAppend(const TSaverContext& saverContext, const NBlobOperations::EConsumer consumerId)
: TBase(saverContext.GetStoragesManager(), consumerId)
@@ -44,7 +44,23 @@ public:
}
- THashMap<TPortionAddress, TPortionInfo> PortionsToRemove;
+ const THashMap<TPortionAddress, TPortionInfo>& GetPortionsToRemove() const {
+ return PortionsToRemove;
+ }
+
+ ui32 GetPortionsToRemoveSize() const {
+ return PortionsToRemove.size();
+ }
+
+ bool HasPortionsToRemove() const {
+ return PortionsToRemove.size();
+ }
+
+ void AddPortionToRemove(const TPortionInfo& info) {
+ AFL_VERIFY(!info.HasRemoveSnapshot());
+ AFL_VERIFY(PortionsToRemove.emplace(info.GetAddress(), info).second);
+ }
+
std::vector<TWritePortionInfoWithBlobs> AppendedPortions;
virtual ui32 GetWritePortionsCount() const override {
return AppendedPortions.size();
diff --git a/ydb/core/tx/columnshard/engines/column_engine.cpp b/ydb/core/tx/columnshard/engines/column_engine.cpp
index d6f46742093..0b1979f537b 100644
--- a/ydb/core/tx/columnshard/engines/column_engine.cpp
+++ b/ydb/core/tx/columnshard/engines/column_engine.cpp
@@ -1,4 +1,5 @@
#include "column_engine.h"
+#include "portions/portion_info.h"
#include <ydb/core/base/appdata.h>
#include <util/system/info.h>
@@ -21,4 +22,37 @@ ui64 IColumnEngine::GetMetadataLimit() {
}
}
+size_t TSelectInfo::NumChunks() const {
+ size_t records = 0;
+ for (auto& portionInfo : PortionsOrderedPK) {
+ records += portionInfo->NumChunks();
+ }
+ return records;
+}
+
+TSelectInfo::TStats TSelectInfo::Stats() const {
+ TStats out;
+ out.Portions = PortionsOrderedPK.size();
+
+ THashSet<TUnifiedBlobId> uniqBlob;
+ for (auto& portionInfo : PortionsOrderedPK) {
+ out.Records += portionInfo->NumChunks();
+ out.Rows += portionInfo->NumRows();
+ for (auto& rec : portionInfo->Records) {
+ out.Bytes += rec.BlobRange.Size;
+ }
+ out.Blobs += portionInfo->GetBlobIdsCount();
+ }
+ return out;
+}
+
+void TSelectInfo::DebugStream(IOutputStream& out) {
+ if (PortionsOrderedPK.size()) {
+ out << "portions:";
+ for (auto& portionInfo : PortionsOrderedPK) {
+ out << portionInfo->DebugString();
+ }
+ }
+}
+
}
diff --git a/ydb/core/tx/columnshard/engines/column_engine.h b/ydb/core/tx/columnshard/engines/column_engine.h
index aba511eec9b..58a2d8eb310 100644
--- a/ydb/core/tx/columnshard/engines/column_engine.h
+++ b/ydb/core/tx/columnshard/engines/column_engine.h
@@ -1,9 +1,9 @@
#pragma once
#include "db_wrapper.h"
-#include "scheme/snapshot_scheme.h"
-#include "predicate/filter.h"
#include "changes/abstract/settings.h"
#include "changes/abstract/compaction_info.h"
+#include "predicate/filter.h"
+#include "scheme/snapshot_scheme.h"
#include "scheme/versions/versioned_index.h"
#include <ydb/core/tx/columnshard/common/reverse_accessor.h>
@@ -49,39 +49,11 @@ struct TSelectInfo {
return NColumnShard::TContainerAccessorWithDirection<std::vector<std::shared_ptr<TPortionInfo>>>(PortionsOrderedPK, reverse);
}
- size_t NumChunks() const {
- size_t records = 0;
- for (auto& portionInfo : PortionsOrderedPK) {
- records += portionInfo->NumChunks();
- }
- return records;
- }
+ size_t NumChunks() const;
- TStats Stats() const {
- TStats out;
- out.Portions = PortionsOrderedPK.size();
+ TStats Stats() const;
- THashSet<TUnifiedBlobId> uniqBlob;
- for (auto& portionInfo : PortionsOrderedPK) {
- out.Records += portionInfo->NumChunks();
- out.Rows += portionInfo->NumRows();
- for (auto& rec : portionInfo->Records) {
- out.Bytes += rec.BlobRange.Size;
- }
- out.Blobs += portionInfo->GetBlobIdsCount();
- }
- return out;
- }
-
- friend IOutputStream& operator << (IOutputStream& out, const TSelectInfo& info) {
- if (info.PortionsOrderedPK.size()) {
- out << "portions:";
- for (auto& portionInfo : info.PortionsOrderedPK) {
- out << portionInfo->DebugString();
- }
- }
- return out;
- }
+ void DebugStream(IOutputStream& out);
};
class TColumnEngineStats {
@@ -317,7 +289,8 @@ public:
virtual std::shared_ptr<TCleanupTablesColumnEngineChanges> StartCleanupTables(THashSet<ui64>& pathsToDrop) noexcept = 0;
virtual std::vector<std::shared_ptr<TTTLColumnEngineChanges>> StartTtl(const THashMap<ui64, TTiering>& pathEviction,
const std::shared_ptr<NDataLocks::TManager>& dataLocksManager, const ui64 memoryUsageLimit) noexcept = 0;
- virtual bool ApplyChanges(IDbWrapper& db, std::shared_ptr<TColumnEngineChanges> changes, const TSnapshot& snapshot) noexcept = 0;
+ virtual bool ApplyChangesOnTxCreate(std::shared_ptr<TColumnEngineChanges> changes, const TSnapshot& snapshot) noexcept = 0;
+ virtual bool ApplyChangesOnExecute(IDbWrapper& db, std::shared_ptr<TColumnEngineChanges> changes, const TSnapshot& snapshot) noexcept = 0;
virtual void RegisterSchemaVersion(const TSnapshot& snapshot, TIndexInfo&& info) = 0;
virtual void RegisterSchemaVersion(const TSnapshot& snapshot, const NKikimrSchemeOp::TColumnTableSchema& schema) = 0;
virtual const TMap<ui64, std::shared_ptr<TColumnEngineStats>>& GetStats() const = 0;
diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
index 2be9f777137..49e039aa1df 100644
--- a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
+++ b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
@@ -7,6 +7,7 @@
#include "changes/cleanup_portions.h"
#include "changes/cleanup_tables.h"
#include "changes/ttl.h"
+#include "portions/constructor.h"
#include <ydb/core/tx/columnshard/common/limits.h>
#include <ydb/core/tx/columnshard/hooks/abstract/abstract.h>
@@ -185,11 +186,12 @@ bool TColumnEngineForLogs::Load(IDbWrapper& db) {
}
bool TColumnEngineForLogs::LoadColumns(IDbWrapper& db) {
+ TPortionConstructors constructors;
{
- if (!db.LoadPortions([&](TPortionInfo&& portion, const NKikimrTxColumnShard::TIndexPortionMeta& metaProto) {
+ if (!db.LoadPortions([&](TPortionInfoConstructor&& portion, const NKikimrTxColumnShard::TIndexPortionMeta& metaProto) {
const TIndexInfo& indexInfo = portion.GetSchema(VersionedIndex)->GetIndexInfo();
- portion.LoadMetadata(metaProto, indexInfo);
- GetGranulePtrVerified(portion.GetPathId())->UpsertPortionOnLoad(std::move(portion));
+ AFL_VERIFY(portion.MutableMeta().LoadMetadata(metaProto, indexInfo));
+ AFL_VERIFY(constructors.AddConstructorVerified(std::move(portion)));
})) {
return false;
}
@@ -197,25 +199,28 @@ bool TColumnEngineForLogs::LoadColumns(IDbWrapper& db) {
{
TPortionInfo::TSchemaCursor schema(VersionedIndex);
- if (!db.LoadColumns([&](const TPortionInfo& portion, const TColumnChunkLoadContext& loadContext) {
+ if (!db.LoadColumns([&](TPortionInfoConstructor&& portion, const TColumnChunkLoadContext& loadContext) {
+ auto* constructor = constructors.MergeConstructor(std::move(portion));
auto currentSchema = schema.GetSchema(portion);
- AFL_VERIFY(portion.ValidSnapshotInfo())("details", portion.DebugString());
- // Locate granule and append the record.
- GetGranulePtrVerified(portion.GetPathId())->AddColumnRecordOnLoad(currentSchema->GetIndexInfo(), portion, loadContext, loadContext.GetPortionMeta());
+ constructor->LoadRecord(currentSchema->GetIndexInfo(), loadContext);
})) {
return false;
}
}
-
- if (!db.LoadIndexes([&](const ui64 pathId, const ui64 portionId, const TIndexChunkLoadContext& loadContext) {
- auto portion = GetGranulePtrVerified(pathId)->GetPortionOptional(portionId);
- AFL_VERIFY(portion);
- const auto linkBlobId = portion->RegisterBlobId(loadContext.GetBlobRange().GetBlobId());
- portion->AddIndex(loadContext.BuildIndexChunk(linkBlobId));
- })) {
- return false;
- };
-
+ {
+ if (!db.LoadIndexes([&](const ui64 pathId, const ui64 portionId, const TIndexChunkLoadContext& loadContext) {
+ auto* constructor = constructors.GetConstructorVerified(pathId, portionId);
+ constructor->LoadIndex(loadContext);
+ })) {
+ return false;
+ };
+ }
+ for (auto&& [granuleId, pathConstructors] : constructors) {
+ auto g = GetGranulePtrVerified(granuleId);
+ for (auto&& [portionId, constructor] : pathConstructors) {
+ g->UpsertPortionOnLoad(constructor.Build(false));
+ }
+ }
for (auto&& i : GranulesStorage->GetTables()) {
i.second->OnAfterPortionsLoad();
}
@@ -413,18 +418,20 @@ std::vector<std::shared_ptr<TTTLColumnEngineChanges>> TColumnEngineForLogs::Star
std::vector<std::shared_ptr<TTTLColumnEngineChanges>> result;
for (auto&& i : context.GetTasks()) {
for (auto&& t : i.second) {
- SignalCounters.OnActualizationTask(t.GetTask()->GetPortionsToEvictCount(), t.GetTask()->PortionsToRemove.size());
+ SignalCounters.OnActualizationTask(t.GetTask()->GetPortionsToEvictCount(), t.GetTask()->GetPortionsToRemoveSize());
result.emplace_back(t.GetTask());
}
}
return result;
}
-bool TColumnEngineForLogs::ApplyChanges(IDbWrapper& db, std::shared_ptr<TColumnEngineChanges> indexChanges, const TSnapshot& snapshot) noexcept {
- {
- TFinalizationContext context(LastGranule, LastPortion, snapshot);
- indexChanges->Compile(context);
- }
+bool TColumnEngineForLogs::ApplyChangesOnTxCreate(std::shared_ptr<TColumnEngineChanges> indexChanges, const TSnapshot& snapshot) noexcept {
+ TFinalizationContext context(LastGranule, LastPortion, snapshot);
+ indexChanges->Compile(context);
+ return true;
+}
+
+bool TColumnEngineForLogs::ApplyChangesOnExecute(IDbWrapper& db, std::shared_ptr<TColumnEngineChanges> /*indexChanges*/, const TSnapshot& snapshot) noexcept {
db.WriteCounter(LAST_PORTION, LastPortion);
db.WriteCounter(LAST_GRANULE, LastGranule);
diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.h b/ydb/core/tx/columnshard/engines/column_engine_logs.h
index d991a6878f7..017ef3cbe84 100644
--- a/ydb/core/tx/columnshard/engines/column_engine_logs.h
+++ b/ydb/core/tx/columnshard/engines/column_engine_logs.h
@@ -114,8 +114,8 @@ public:
void ReturnToIndexes(const THashMap<ui64, THashSet<ui64>>& portions) const {
return GranulesStorage->ReturnToIndexes(portions);
}
- bool ApplyChanges(IDbWrapper& db, std::shared_ptr<TColumnEngineChanges> indexChanges,
- const TSnapshot& snapshot) noexcept override;
+ virtual bool ApplyChangesOnTxCreate(std::shared_ptr<TColumnEngineChanges> indexChanges, const TSnapshot& snapshot) noexcept override;
+ virtual bool ApplyChangesOnExecute(IDbWrapper& db, std::shared_ptr<TColumnEngineChanges> indexChanges, const TSnapshot& snapshot) noexcept override;
void RegisterSchemaVersion(const TSnapshot& snapshot, TIndexInfo&& info) override;
void RegisterSchemaVersion(const TSnapshot& snapshot, const NKikimrSchemeOp::TColumnTableSchema& schema) override;
diff --git a/ydb/core/tx/columnshard/engines/db_wrapper.cpp b/ydb/core/tx/columnshard/engines/db_wrapper.cpp
index 535c79fd9e2..4bc8f9bf093 100644
--- a/ydb/core/tx/columnshard/engines/db_wrapper.cpp
+++ b/ydb/core/tx/columnshard/engines/db_wrapper.cpp
@@ -1,5 +1,6 @@
#include "defs.h"
#include "db_wrapper.h"
+#include "portions/constructor.h"
#include <ydb/core/tx/columnshard/columnshard_schema.h>
namespace NKikimr::NOlap {
@@ -48,7 +49,7 @@ void TDbWrapper::WriteColumn(const NOlap::TPortionInfo& portion, const TColumnRe
}
using IndexColumns = NColumnShard::Schema::IndexColumns;
auto removeSnapshot = portion.GetRemoveSnapshotOptional();
- db.Table<IndexColumns>().Key(0, portion.GetDeprecatedGranuleId(), row.ColumnId,
+ db.Table<IndexColumns>().Key(0, portion.GetPathId(), row.ColumnId,
portion.GetMinSnapshotDeprecated().GetPlanStep(), portion.GetMinSnapshotDeprecated().GetTxId(), portion.GetPortion(), row.Chunk).Update(
NIceDb::TUpdate<IndexColumns::XPlanStep>(removeSnapshot ? removeSnapshot->GetPlanStep() : 0),
NIceDb::TUpdate<IndexColumns::XTxId>(removeSnapshot ? removeSnapshot->GetTxId() : 0),
@@ -81,11 +82,11 @@ void TDbWrapper::ErasePortion(const NOlap::TPortionInfo& portion) {
void TDbWrapper::EraseColumn(const NOlap::TPortionInfo& portion, const TColumnRecord& row) {
NIceDb::TNiceDb db(Database);
using IndexColumns = NColumnShard::Schema::IndexColumns;
- db.Table<IndexColumns>().Key(0, portion.GetDeprecatedGranuleId(), row.ColumnId,
+ db.Table<IndexColumns>().Key(0, portion.GetPathId(), row.ColumnId,
portion.GetMinSnapshotDeprecated().GetPlanStep(), portion.GetMinSnapshotDeprecated().GetTxId(), portion.GetPortion(), row.Chunk).Delete();
}
-bool TDbWrapper::LoadColumns(const std::function<void(const NOlap::TPortionInfo&, const TColumnChunkLoadContext&)>& callback) {
+bool TDbWrapper::LoadColumns(const std::function<void(NOlap::TPortionInfoConstructor&&, const TColumnChunkLoadContext&)>& callback) {
NIceDb::TNiceDb db(Database);
using IndexColumns = NColumnShard::Schema::IndexColumns;
auto rowset = db.Table<IndexColumns>().Prefix(0).Select();
@@ -94,17 +95,15 @@ bool TDbWrapper::LoadColumns(const std::function<void(const NOlap::TPortionInfo&
}
while (!rowset.EndOfSet()) {
- NOlap::TPortionInfo portion = NOlap::TPortionInfo::BuildEmpty();
- portion.SetPathId(rowset.GetValue<IndexColumns::PathId>());
- portion.SetPortion(rowset.GetValue<IndexColumns::Portion>());
- portion.SetMinSnapshotDeprecated(NOlap::TSnapshot(rowset.GetValue<IndexColumns::PlanStep>(), rowset.GetValue<IndexColumns::TxId>()));
- portion.SetDeprecatedGranuleId(rowset.GetValue<IndexColumns::Granule>());
+ NOlap::TSnapshot minSnapshot(rowset.GetValue<IndexColumns::PlanStep>(), rowset.GetValue<IndexColumns::TxId>());
+ NOlap::TSnapshot removeSnapshot(rowset.GetValue<IndexColumns::XPlanStep>(), rowset.GetValue<IndexColumns::XTxId>());
- NOlap::TColumnChunkLoadContext chunkLoadContext(rowset, DsGroupSelector);
-
- portion.SetRemoveSnapshot(rowset.GetValue<IndexColumns::XPlanStep>(), rowset.GetValue<IndexColumns::XTxId>());
+ NOlap::TPortionInfoConstructor constructor(rowset.GetValue<IndexColumns::PathId>(), rowset.GetValue<IndexColumns::Portion>());
+ constructor.SetMinSnapshotDeprecated(minSnapshot);
+ constructor.SetRemoveSnapshot(removeSnapshot);
- callback(portion, chunkLoadContext);
+ NOlap::TColumnChunkLoadContext chunkLoadContext(rowset, DsGroupSelector);
+ callback(std::move(constructor), chunkLoadContext);
if (!rowset.Next()) {
return false;
@@ -113,7 +112,7 @@ bool TDbWrapper::LoadColumns(const std::function<void(const NOlap::TPortionInfo&
return true;
}
-bool TDbWrapper::LoadPortions(const std::function<void(NOlap::TPortionInfo&&, const NKikimrTxColumnShard::TIndexPortionMeta&)>& callback) {
+bool TDbWrapper::LoadPortions(const std::function<void(NOlap::TPortionInfoConstructor&&, const NKikimrTxColumnShard::TIndexPortionMeta&)>& callback) {
NIceDb::TNiceDb db(Database);
using IndexPortions = NColumnShard::Schema::IndexPortions;
auto rowset = db.Table<IndexPortions>().Select();
@@ -122,9 +121,7 @@ bool TDbWrapper::LoadPortions(const std::function<void(NOlap::TPortionInfo&&, co
}
while (!rowset.EndOfSet()) {
- NOlap::TPortionInfo portion = NOlap::TPortionInfo::BuildEmpty();
- portion.SetPathId(rowset.GetValue<IndexPortions::PathId>());
- portion.SetPortion(rowset.GetValue<IndexPortions::PortionId>());
+ NOlap::TPortionInfoConstructor portion(rowset.GetValue<IndexPortions::PathId>(), rowset.GetValue<IndexPortions::PortionId>());
portion.SetSchemaVersion(rowset.GetValue<IndexPortions::SchemaVersion>());
portion.SetRemoveSnapshot(rowset.GetValue<IndexPortions::XPlanStep>(), rowset.GetValue<IndexPortions::XTxId>());
diff --git a/ydb/core/tx/columnshard/engines/db_wrapper.h b/ydb/core/tx/columnshard/engines/db_wrapper.h
index 1f4990da1cc..d9435913bd1 100644
--- a/ydb/core/tx/columnshard/engines/db_wrapper.h
+++ b/ydb/core/tx/columnshard/engines/db_wrapper.h
@@ -21,6 +21,7 @@ class TIndexChunk;
struct TGranuleRecord;
class IColumnEngine;
class TPortionInfo;
+class TPortionInfoConstructor;
class IDbWrapper {
public:
@@ -38,11 +39,11 @@ public:
virtual void WriteColumn(const TPortionInfo& portion, const TColumnRecord& row, const ui32 firstPKColumnId) = 0;
virtual void EraseColumn(const TPortionInfo& portion, const TColumnRecord& row) = 0;
- virtual bool LoadColumns(const std::function<void(const TPortionInfo&, const TColumnChunkLoadContext&)>& callback) = 0;
+ virtual bool LoadColumns(const std::function<void(NOlap::TPortionInfoConstructor&&, const TColumnChunkLoadContext&)>& callback) = 0;
virtual void WritePortion(const NOlap::TPortionInfo& portion) = 0;
virtual void ErasePortion(const NOlap::TPortionInfo& portion) = 0;
- virtual bool LoadPortions(const std::function<void(NOlap::TPortionInfo&&, const NKikimrTxColumnShard::TIndexPortionMeta&)>& callback) = 0;
+ virtual bool LoadPortions(const std::function<void(NOlap::TPortionInfoConstructor&&, const NKikimrTxColumnShard::TIndexPortionMeta&)>& callback) = 0;
virtual void WriteIndex(const TPortionInfo& portion, const TIndexChunk& row) = 0;
virtual void EraseIndex(const TPortionInfo& portion, const TIndexChunk& row) = 0;
@@ -70,11 +71,11 @@ public:
void WritePortion(const NOlap::TPortionInfo& portion) override;
void ErasePortion(const NOlap::TPortionInfo& portion) override;
- bool LoadPortions(const std::function<void(NOlap::TPortionInfo&&, const NKikimrTxColumnShard::TIndexPortionMeta&)>& callback) override;
+ bool LoadPortions(const std::function<void(NOlap::TPortionInfoConstructor&&, const NKikimrTxColumnShard::TIndexPortionMeta&)>& callback) override;
void WriteColumn(const NOlap::TPortionInfo& portion, const TColumnRecord& row, const ui32 firstPKColumnId) override;
void EraseColumn(const NOlap::TPortionInfo& portion, const TColumnRecord& row) override;
- bool LoadColumns(const std::function<void(const NOlap::TPortionInfo&, const TColumnChunkLoadContext&)>& callback) override;
+ bool LoadColumns(const std::function<void(NOlap::TPortionInfoConstructor&&, const TColumnChunkLoadContext&)>& callback) override;
virtual void WriteIndex(const TPortionInfo& portion, const TIndexChunk& row) override;
virtual void EraseIndex(const TPortionInfo& portion, const TIndexChunk& row) override;
diff --git a/ydb/core/tx/columnshard/engines/portions/column_record.h b/ydb/core/tx/columnshard/engines/portions/column_record.h
index 9d958055cbf..7b4e54f5154 100644
--- a/ydb/core/tx/columnshard/engines/portions/column_record.h
+++ b/ydb/core/tx/columnshard/engines/portions/column_record.h
@@ -91,7 +91,7 @@ public:
}
void RegisterBlobIdx(const ui16 blobIdx) {
-// AFL_VERIFY(!BlobRange.BlobId.GetTabletId())("original", BlobRange.BlobId.ToStringNew())("new", blobId.ToStringNew());
+ AFL_VERIFY(!BlobRange.BlobIdx)("original", BlobRange.BlobIdx)("new", blobIdx);
BlobRange.BlobIdx = blobIdx;
}
diff --git a/ydb/core/tx/columnshard/engines/portions/constructor.cpp b/ydb/core/tx/columnshard/engines/portions/constructor.cpp
new file mode 100644
index 00000000000..f3de740f4f8
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/portions/constructor.cpp
@@ -0,0 +1,87 @@
+#include "constructor.h"
+#include <ydb/core/tx/columnshard/columnshard_schema.h>
+#include <ydb/core/tx/columnshard/engines/scheme/index_info.h>
+#include <ydb/core/tx/columnshard/engines/scheme/versions/abstract_scheme.h>
+#include <ydb/core/tx/columnshard/engines/scheme/versions/versioned_index.h>
+
+namespace NKikimr::NOlap {
+
+TPortionInfo TPortionInfoConstructor::Build(const bool needChunksNormalization) {
+ TPortionInfo result(MetaConstructor.Build());
+ AFL_VERIFY(PathId);
+ result.PathId = PathId;
+ result.Portion = GetPortionIdVerified();
+
+ AFL_VERIFY(MinSnapshotDeprecated);
+ AFL_VERIFY(MinSnapshotDeprecated->Valid());
+ result.MinSnapshotDeprecated = *MinSnapshotDeprecated;
+ if (RemoveSnapshot) {
+ AFL_VERIFY(RemoveSnapshot->Valid());
+ result.RemoveSnapshot = *RemoveSnapshot;
+ }
+ result.SchemaVersion = SchemaVersion;
+
+ if (needChunksNormalization) {
+ ReorderChunks();
+ }
+ FullValidation();
+
+ result.Indexes = Indexes;
+ result.Records = Records;
+ result.BlobIds = BlobIds;
+ return result;
+}
+
+ISnapshotSchema::TPtr TPortionInfoConstructor::GetSchema(const TVersionedIndex& index) const {
+ if (SchemaVersion) {
+ auto schema = index.GetSchema(SchemaVersion.value());
+ AFL_VERIFY(!!schema)("details", TStringBuilder() << "cannot find schema for version " << SchemaVersion.value());
+ return schema;
+ }
+ AFL_VERIFY(MinSnapshotDeprecated);
+ return index.GetSchema(*MinSnapshotDeprecated);
+}
+
+void TPortionInfoConstructor::LoadRecord(const TIndexInfo& indexInfo, const TColumnChunkLoadContext& loadContext) {
+ TColumnRecord rec(RegisterBlobId(loadContext.GetBlobRange().GetBlobId()), loadContext, indexInfo.GetColumnFeaturesVerified(loadContext.GetAddress().GetColumnId()));
+ Records.push_back(std::move(rec));
+
+ if (loadContext.GetPortionMeta()) {
+ AFL_VERIFY(MetaConstructor.LoadMetadata(*loadContext.GetPortionMeta(), indexInfo));
+ }
+}
+
+void TPortionInfoConstructor::LoadIndex(const TIndexChunkLoadContext& loadContext) {
+ const auto linkBlobId = RegisterBlobId(loadContext.GetBlobRange().GetBlobId());
+ AddIndex(loadContext.BuildIndexChunk(linkBlobId));
+}
+
+const NKikimr::NOlap::TColumnRecord& TPortionInfoConstructor::AppendOneChunkColumn(TColumnRecord&& record) {
+ Y_ABORT_UNLESS(record.ColumnId);
+ std::optional<ui32> maxChunk;
+ for (auto&& i : Records) {
+ if (i.ColumnId == record.ColumnId) {
+ if (!maxChunk) {
+ maxChunk = i.Chunk;
+ } else {
+ Y_ABORT_UNLESS(*maxChunk + 1 == i.Chunk);
+ maxChunk = i.Chunk;
+ }
+ }
+ }
+ if (maxChunk) {
+ AFL_VERIFY(*maxChunk + 1 == record.Chunk)("max", *maxChunk)("record", record.Chunk);
+ } else {
+ AFL_VERIFY(0 == record.Chunk)("record", record.Chunk);
+ }
+ Records.emplace_back(std::move(record));
+ return Records.back();
+}
+
+void TPortionInfoConstructor::AddMetadata(const ISnapshotSchema& snapshotSchema, const std::shared_ptr<arrow::RecordBatch>& batch) {
+ Y_ABORT_UNLESS(batch->num_rows() == GetRecordsCount());
+ MetaConstructor.FillMetaInfo(NArrow::TFirstLastSpecialKeys(batch),
+ NArrow::TMinMaxSpecialKeys(batch, TIndexInfo::ArrowSchemaSnapshot()), snapshotSchema.GetIndexInfo());
+}
+
+} \ No newline at end of file
diff --git a/ydb/core/tx/columnshard/engines/portions/constructor.h b/ydb/core/tx/columnshard/engines/portions/constructor.h
new file mode 100644
index 00000000000..d40380bdf4a
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/portions/constructor.h
@@ -0,0 +1,333 @@
+#pragma once
+#include "constructor_meta.h"
+#include "column_record.h"
+#include "index_chunk.h"
+#include "portion_info.h"
+
+#include <ydb/library/accessor/accessor.h>
+
+namespace NKikimr::NOlap {
+class TPortionInfo;
+class TVersionedIndex;
+class ISnapshotSchema;
+class TIndexChunkLoadContext;
+
+class TPortionInfoConstructor {
+private:
+ YDB_ACCESSOR(ui64, PathId, 0);
+ std::optional<ui64> PortionId;
+
+ TPortionMetaConstructor MetaConstructor;
+
+ std::optional<TSnapshot> MinSnapshotDeprecated;
+ std::optional<TSnapshot> RemoveSnapshot;
+ std::optional<ui64> SchemaVersion;
+
+ std::vector<TIndexChunk> Indexes;
+ YDB_ACCESSOR_DEF(std::vector<TColumnRecord>, Records);
+ std::vector<TUnifiedBlobId> BlobIds;
+public:
+ void SetPortionId(const ui64 value) {
+ AFL_VERIFY(value);
+ PortionId = value;
+ }
+
+ void AddMetadata(const ISnapshotSchema& snapshotSchema, const std::shared_ptr<arrow::RecordBatch>& batch);
+
+ void AddMetadata(const ISnapshotSchema& snapshotSchema, const NArrow::TFirstLastSpecialKeys& firstLastRecords, const NArrow::TMinMaxSpecialKeys& minMaxSpecial) {
+ MetaConstructor.FillMetaInfo(firstLastRecords, minMaxSpecial, snapshotSchema.GetIndexInfo());
+ }
+
+ ui64 GetPortionIdVerified() const {
+ AFL_VERIFY(PortionId);
+ AFL_VERIFY(*PortionId);
+ return *PortionId;
+ }
+
+ TPortionMetaConstructor& MutableMeta() {
+ return MetaConstructor;
+ }
+
+ const TPortionMetaConstructor& GetMeta() const {
+ return MetaConstructor;
+ }
+
+ TPortionInfoConstructor(const TPortionInfo& portion, const bool withBlobs, const bool withMetadata)
+ : PathId(portion.GetPathId())
+ , PortionId(portion.GetPortionId())
+ , MinSnapshotDeprecated(portion.GetMinSnapshotDeprecated())
+ , RemoveSnapshot(portion.GetRemoveSnapshotOptional())
+ , SchemaVersion(portion.GetSchemaVersionOptional())
+ {
+ if (withMetadata) {
+ MetaConstructor = TPortionMetaConstructor(portion.Meta);
+ }
+ if (withBlobs) {
+ Indexes = portion.GetIndexes();
+ Records = portion.GetRecords();
+ BlobIds = portion.BlobIds;
+ }
+ }
+
+ TPortionInfoConstructor(TPortionInfo&& portion)
+ : PathId(portion.GetPathId())
+ , PortionId(portion.GetPortionId())
+ , MinSnapshotDeprecated(portion.GetMinSnapshotDeprecated())
+ , RemoveSnapshot(portion.GetRemoveSnapshotOptional())
+ , SchemaVersion(portion.GetSchemaVersionOptional())
+ {
+ MetaConstructor = TPortionMetaConstructor(portion.Meta);
+
+ Indexes = std::move(portion.Indexes);
+ Records = std::move(portion.Records);
+ BlobIds = std::move(portion.BlobIds);
+ }
+
+ TPortionAddress GetAddress() const {
+ return TPortionAddress(PathId, GetPortionIdVerified());
+ }
+
+ bool HasRemoveSnapshot() const {
+ return !!RemoveSnapshot;
+ }
+
+ template <class TChunkInfo>
+ static void CheckChunksOrder(const std::vector<TChunkInfo>& chunks) {
+ ui32 entityId = 0;
+ ui32 chunkIdx = 0;
+ for (auto&& i : chunks) {
+ if (entityId != i.GetEntityId()) {
+ AFL_VERIFY(entityId < i.GetEntityId());
+ AFL_VERIFY(i.GetChunkIdx() == 0);
+ entityId = i.GetEntityId();
+ chunkIdx = 0;
+ } else {
+ AFL_VERIFY(i.GetChunkIdx() == chunkIdx + 1);
+ chunkIdx = i.GetChunkIdx();
+ }
+ AFL_VERIFY(i.GetEntityId());
+ }
+ }
+
+ void Merge(TPortionInfoConstructor&& item) {
+ AFL_VERIFY(item.PathId == PathId);
+ AFL_VERIFY(item.PortionId == PortionId);
+ if (item.MinSnapshotDeprecated) {
+ if (MinSnapshotDeprecated) {
+ AFL_VERIFY(*MinSnapshotDeprecated == *item.MinSnapshotDeprecated);
+ } else {
+ MinSnapshotDeprecated = item.MinSnapshotDeprecated;
+ }
+ }
+ if (item.RemoveSnapshot) {
+ if (RemoveSnapshot) {
+ AFL_VERIFY(*RemoveSnapshot == *item.RemoveSnapshot);
+ } else {
+ RemoveSnapshot = item.RemoveSnapshot;
+ }
+ }
+ }
+
+ TPortionInfoConstructor(const ui64 pathId, const ui64 portionId)
+ : PathId(pathId)
+ , PortionId(portionId) {
+ AFL_VERIFY(PathId);
+ AFL_VERIFY(PortionId);
+ }
+
+ TPortionInfoConstructor(const ui64 pathId)
+ : PathId(pathId) {
+ AFL_VERIFY(PathId);
+ }
+
+ const TSnapshot& GetMinSnapshotDeprecatedVerified() const {
+ AFL_VERIFY(!!MinSnapshotDeprecated);
+ return *MinSnapshotDeprecated;
+ }
+
+ std::shared_ptr<ISnapshotSchema> GetSchema(const TVersionedIndex& index) const;
+
+ void SetMinSnapshotDeprecated(const TSnapshot& snap) {
+ Y_ABORT_UNLESS(snap.Valid());
+ MinSnapshotDeprecated = snap;
+ }
+
+ void SetSchemaVersion(const ui64 version) {
+// AFL_VERIFY(version); engines/ut
+ SchemaVersion = version;
+ }
+
+ void SetRemoveSnapshot(const TSnapshot& snap) {
+ AFL_VERIFY(!RemoveSnapshot);
+ if (snap.Valid()) {
+ RemoveSnapshot = snap;
+ }
+ }
+
+ void SetRemoveSnapshot(const ui64 planStep, const ui64 txId) {
+ SetRemoveSnapshot(TSnapshot(planStep, txId));
+ }
+
+ void LoadRecord(const TIndexInfo& indexInfo, const TColumnChunkLoadContext& loadContext);
+
+ ui32 GetRecordsCount() const {
+ ui32 result = 0;
+ std::optional<ui32> columnIdFirst;
+ for (auto&& i : Records) {
+ if (!columnIdFirst || *columnIdFirst == i.ColumnId) {
+ result += i.GetMeta().GetNumRows();
+ columnIdFirst = i.ColumnId;
+ }
+ }
+ AFL_VERIFY(columnIdFirst);
+ return result;
+ }
+
+ TBlobRangeLink16::TLinkId RegisterBlobId(const TUnifiedBlobId& blobId) {
+ AFL_VERIFY(blobId.IsValid());
+ TBlobRangeLink16::TLinkId idx = 0;
+ for (auto&& i : BlobIds) {
+ if (i == blobId) {
+ return idx;
+ }
+ ++idx;
+ }
+ BlobIds.emplace_back(blobId);
+ return idx;
+ }
+
+ const TBlobRange RestoreBlobRange(const TBlobRangeLink16& linkRange) const {
+ return linkRange.RestoreRange(GetBlobId(linkRange.GetBlobIdxVerified()));
+ }
+
+ const TUnifiedBlobId& GetBlobId(const TBlobRangeLink16::TLinkId linkId) const {
+ AFL_VERIFY(linkId < BlobIds.size());
+ return BlobIds[linkId];
+ }
+
+ ui32 GetBlobIdsCount() const {
+ return BlobIds.size();
+ }
+
+ void RegisterBlobIdx(const TChunkAddress& address, const TBlobRangeLink16::TLinkId blobIdx) {
+ for (auto&& i : Records) {
+ if (i.GetColumnId() == address.GetEntityId() && i.GetChunkIdx() == address.GetChunkIdx()) {
+ i.RegisterBlobIdx(blobIdx);
+ return;
+ }
+ }
+ for (auto&& i : Indexes) {
+ if (i.GetIndexId() == address.GetEntityId() && i.GetChunkIdx() == address.GetChunkIdx()) {
+ i.RegisterBlobIdx(blobIdx);
+ return;
+ }
+ }
+ AFL_VERIFY(false)("problem", "portion haven't address for blob registration")("address", address.DebugString());
+ }
+
+ TString DebugString() const {
+ TStringBuilder sb;
+ return sb;
+ }
+
+ void ReorderChunks() {
+ {
+ auto pred = [](const TColumnRecord& l, const TColumnRecord& r) {
+ return l.GetAddress() < r.GetAddress();
+ };
+ std::sort(Records.begin(), Records.end(), pred);
+ CheckChunksOrder(Records);
+ }
+ {
+ auto pred = [](const TIndexChunk& l, const TIndexChunk& r) {
+ return l.GetAddress() < r.GetAddress();
+ };
+ std::sort(Indexes.begin(), Indexes.end(), pred);
+ CheckChunksOrder(Indexes);
+ }
+ }
+
+ void FullValidation() const {
+ AFL_VERIFY(Records.size());
+ CheckChunksOrder(Records);
+ CheckChunksOrder(Indexes);
+ std::set<ui32> blobIdxs;
+ for (auto&& i : Records) {
+ blobIdxs.emplace(i.GetBlobRange().GetBlobIdxVerified());
+ }
+ for (auto&& i : Indexes) {
+ blobIdxs.emplace(i.GetBlobRange().GetBlobIdxVerified());
+ }
+ if (BlobIds.size()) {
+ AFL_VERIFY(BlobIds.size() == blobIdxs.size());
+ AFL_VERIFY(BlobIds.size() == *blobIdxs.rbegin() + 1);
+ } else {
+ AFL_VERIFY(blobIdxs.empty());
+ }
+ }
+
+ void LoadIndex(const TIndexChunkLoadContext& loadContext);
+
+ const TColumnRecord& AppendOneChunkColumn(TColumnRecord&& record);
+
+ void AddIndex(const TIndexChunk& chunk) {
+ ui32 chunkIdx = 0;
+ for (auto&& i : Indexes) {
+ if (i.GetIndexId() == chunk.GetIndexId()) {
+ AFL_VERIFY(chunkIdx == i.GetChunkIdx())("index_id", chunk.GetIndexId())("expected", chunkIdx)("real", i.GetChunkIdx());
+ ++chunkIdx;
+ }
+ }
+ AFL_VERIFY(chunkIdx == chunk.GetChunkIdx())("index_id", chunk.GetIndexId())("expected", chunkIdx)("real", chunk.GetChunkIdx());
+ Indexes.emplace_back(chunk);
+ }
+
+ TPortionInfo Build(const bool needChunksNormalization);
+};
+
+class TPortionConstructors {
+private:
+ THashMap<ui64, THashMap<ui64, TPortionInfoConstructor>> Constructors;
+public:
+ THashMap<ui64, THashMap<ui64, TPortionInfoConstructor>>::iterator begin() {
+ return Constructors.begin();
+ }
+
+ THashMap<ui64, THashMap<ui64, TPortionInfoConstructor>>::iterator end() {
+ return Constructors.end();
+ }
+
+ TPortionInfoConstructor* GetConstructorVerified(const ui64 pathId, const ui64 portionId) {
+ auto itPathId = Constructors.find(pathId);
+ AFL_VERIFY(itPathId != Constructors.end());
+ auto itPortionId = itPathId->second.find(portionId);
+ AFL_VERIFY(itPortionId != itPathId->second.end());
+ return &itPortionId->second;
+ }
+
+ TPortionInfoConstructor* AddConstructorVerified(TPortionInfoConstructor&& constructor) {
+ const ui64 pathId = constructor.GetPathId();
+ const ui64 portionId = constructor.GetPortionIdVerified();
+ auto info = Constructors[pathId].emplace(portionId, std::move(constructor));
+ AFL_VERIFY(info.second);
+ return &info.first->second;
+ }
+
+ TPortionInfoConstructor* MergeConstructor(TPortionInfoConstructor&& constructor) {
+ const ui64 pathId = constructor.GetPathId();
+ const ui64 portionId = constructor.GetPortionIdVerified();
+ auto itPathId = Constructors.find(pathId);
+ if (itPathId == Constructors.end()) {
+ return AddConstructorVerified(std::move(constructor));
+ }
+ auto itPortionId = itPathId->second.find(portionId);
+ if (itPortionId == itPathId->second.end()) {
+ return AddConstructorVerified(std::move(constructor));
+ }
+ itPortionId->second.Merge(std::move(constructor));
+ return &itPortionId->second;
+ }
+
+};
+
+} \ No newline at end of file
diff --git a/ydb/core/tx/columnshard/engines/portions/constructor_meta.cpp b/ydb/core/tx/columnshard/engines/portions/constructor_meta.cpp
new file mode 100644
index 00000000000..3f88d1f0e1d
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/portions/constructor_meta.cpp
@@ -0,0 +1,105 @@
+#include "constructor_meta.h"
+#include <ydb/core/tx/columnshard/blobs_action/common/const.h>
+#include <ydb/core/tx/columnshard/engines/scheme/index_info.h>
+
+namespace NKikimr::NOlap {
+
+void TPortionMetaConstructor::FillMetaInfo(const NArrow::TFirstLastSpecialKeys& primaryKeys, const NArrow::TMinMaxSpecialKeys& snapshotKeys, const TIndexInfo& indexInfo) {
+ AFL_VERIFY(!FirstAndLastPK);
+ FirstAndLastPK = *primaryKeys.BuildAccordingToSchemaVerified(indexInfo.GetReplaceKey());
+ AFL_VERIFY(!RecordSnapshotMin);
+ AFL_VERIFY(!RecordSnapshotMax);
+ {
+ auto cPlanStep = snapshotKeys.GetBatch()->GetColumnByName(TIndexInfo::SPEC_COL_PLAN_STEP);
+ auto cTxId = snapshotKeys.GetBatch()->GetColumnByName(TIndexInfo::SPEC_COL_TX_ID);
+ Y_ABORT_UNLESS(cPlanStep && cTxId);
+ Y_ABORT_UNLESS(cPlanStep->type_id() == arrow::UInt64Type::type_id);
+ Y_ABORT_UNLESS(cTxId->type_id() == arrow::UInt64Type::type_id);
+ const arrow::UInt64Array& cPlanStepArray = static_cast<const arrow::UInt64Array&>(*cPlanStep);
+ const arrow::UInt64Array& cTxIdArray = static_cast<const arrow::UInt64Array&>(*cTxId);
+ RecordSnapshotMin = TSnapshot(cPlanStepArray.GetView(0), cTxIdArray.GetView(0));
+ RecordSnapshotMax = TSnapshot(cPlanStepArray.GetView(snapshotKeys.GetBatch()->num_rows() - 1), cTxIdArray.GetView(snapshotKeys.GetBatch()->num_rows() - 1));
+ }
+}
+
+TPortionMetaConstructor::TPortionMetaConstructor(const TPortionMeta& meta) {
+ FirstAndLastPK = meta.ReplaceKeyEdges;
+ RecordSnapshotMin = meta.RecordSnapshotMin;
+ RecordSnapshotMax = meta.RecordSnapshotMax;
+ TierName = meta.GetTierNameOptional();
+ if (!meta.StatisticsStorage.IsEmpty()) {
+ StatisticsStorage = meta.StatisticsStorage;
+ }
+ if (meta.Produced != NPortion::EProduced::UNSPECIFIED) {
+ Produced = meta.Produced;
+ }
+}
+
+NKikimr::NOlap::TPortionMeta TPortionMetaConstructor::Build() {
+ AFL_VERIFY(FirstAndLastPK);
+ AFL_VERIFY(RecordSnapshotMin);
+ AFL_VERIFY(RecordSnapshotMax);
+ TPortionMeta result(*FirstAndLastPK, *RecordSnapshotMin, *RecordSnapshotMax);
+ if (TierName) {
+ result.TierName = *TierName;
+ }
+ AFL_VERIFY(Produced);
+ result.Produced = *Produced;
+ if (StatisticsStorage) {
+ result.StatisticsStorage = *StatisticsStorage;
+ }
+ return result;
+}
+
+bool TPortionMetaConstructor::LoadMetadata(const NKikimrTxColumnShard::TIndexPortionMeta& portionMeta, const TIndexInfo& indexInfo) {
+ if (!!Produced) {
+ AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("event", "DeserializeFromProto")("error", "parsing duplication");
+ return true;
+ }
+ if (portionMeta.HasStatisticsStorage()) {
+ auto parsed = NStatistics::TPortionStorage::BuildFromProto(portionMeta.GetStatisticsStorage());
+ if (!parsed) {
+ AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("event", "DeserializeFromProto")("error", parsed.GetErrorMessage());
+ return false;
+ }
+ StatisticsStorage = parsed.DetachResult();
+ if (StatisticsStorage->IsEmpty()) {
+ StatisticsStorage.reset();
+ }
+ }
+ if (portionMeta.GetTierName()) {
+ TierName = portionMeta.GetTierName();
+ }
+ if (portionMeta.GetIsInserted()) {
+ Produced = TPortionMeta::EProduced::INSERTED;
+ } else if (portionMeta.GetIsCompacted()) {
+ Produced = TPortionMeta::EProduced::COMPACTED;
+ } else if (portionMeta.GetIsSplitCompacted()) {
+ Produced = TPortionMeta::EProduced::SPLIT_COMPACTED;
+ } else if (portionMeta.GetIsEvicted()) {
+ Produced = TPortionMeta::EProduced::EVICTED;
+ } else {
+ AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("event", "DeserializeFromProto")("error", "incorrect portion meta")("meta", portionMeta.DebugString());
+ return false;
+ }
+ AFL_VERIFY(Produced != TPortionMeta::EProduced::UNSPECIFIED);
+ AFL_VERIFY(portionMeta.HasPrimaryKeyBorders());
+ FirstAndLastPK = NArrow::TFirstLastSpecialKeys(portionMeta.GetPrimaryKeyBorders(), indexInfo.GetReplaceKey());
+
+ AFL_VERIFY(portionMeta.HasRecordSnapshotMin());
+ RecordSnapshotMin = TSnapshot(portionMeta.GetRecordSnapshotMin().GetPlanStep(), portionMeta.GetRecordSnapshotMin().GetTxId());
+ AFL_VERIFY(portionMeta.HasRecordSnapshotMax());
+ RecordSnapshotMax = TSnapshot(portionMeta.GetRecordSnapshotMax().GetPlanStep(), portionMeta.GetRecordSnapshotMax().GetTxId());
+ return true;
+}
+
+void TPortionMetaConstructor::SetTierName(const TString& tierName) {
+ AFL_VERIFY(!TierName);
+ if (!tierName || tierName == NBlobOperations::TGlobal::DefaultStorageId) {
+ TierName.reset();
+ } else {
+ TierName = tierName;
+ }
+}
+
+} \ No newline at end of file
diff --git a/ydb/core/tx/columnshard/engines/portions/constructor_meta.h b/ydb/core/tx/columnshard/engines/portions/constructor_meta.h
new file mode 100644
index 00000000000..b9603606a2c
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/portions/constructor_meta.h
@@ -0,0 +1,52 @@
+#pragma once
+#include "meta.h"
+#include <ydb/core/formats/arrow/special_keys.h>
+#include <ydb/core/tx/columnshard/common/portion.h>
+#include <ydb/core/tx/columnshard/common/snapshot.h>
+#include <ydb/core/tx/columnshard/engines/scheme/statistics/abstract/portion_storage.h>
+
+namespace NKikimr::NOlap {
+class TPortionInfoConstructor;
+struct TIndexInfo;
+
+class TPortionMetaConstructor {
+private:
+ std::optional<NArrow::TFirstLastSpecialKeys> FirstAndLastPK;
+ std::optional<TString> TierName;
+ std::optional<NStatistics::TPortionStorage> StatisticsStorage;
+ std::optional<TSnapshot> RecordSnapshotMin;
+ std::optional<TSnapshot> RecordSnapshotMax;
+ std::optional<NPortion::EProduced> Produced;
+ friend class TPortionInfoConstructor;
+ void FillMetaInfo(const NArrow::TFirstLastSpecialKeys& primaryKeys, const NArrow::TMinMaxSpecialKeys& snapshotKeys, const TIndexInfo& indexInfo);
+
+public:
+ TPortionMetaConstructor() = default;
+ TPortionMetaConstructor(const TPortionMeta& meta);
+
+ void SetTierName(const TString& tierName);
+ void ResetTierName(const TString& tierName) {
+ TierName.reset();
+ SetTierName(tierName);
+ }
+
+ void SetStatisticsStorage(NStatistics::TPortionStorage&& storage) {
+ AFL_VERIFY(!StatisticsStorage);
+ StatisticsStorage = std::move(storage);
+ }
+
+ void ResetStatisticsStorage(NStatistics::TPortionStorage&& storage) {
+ StatisticsStorage = std::move(storage);
+ }
+
+ void UpdateRecordsMeta(const NPortion::EProduced prod) {
+ Produced = prod;
+ }
+
+ TPortionMeta Build();
+
+ [[nodiscard]] bool LoadMetadata(const NKikimrTxColumnShard::TIndexPortionMeta& portionMeta, const TIndexInfo& indexInfo);
+
+};
+
+} \ No newline at end of file
diff --git a/ydb/core/tx/columnshard/engines/portions/meta.cpp b/ydb/core/tx/columnshard/engines/portions/meta.cpp
index 504ea4b3485..444172568d5 100644
--- a/ydb/core/tx/columnshard/engines/portions/meta.cpp
+++ b/ydb/core/tx/columnshard/engines/portions/meta.cpp
@@ -2,84 +2,18 @@
#include <ydb/core/formats/arrow/arrow_filter.h>
#include <ydb/core/tx/columnshard/engines/scheme/index_info.h>
+#include <ydb/core/tx/columnshard/blobs_action/common/const.h>
#include <ydb/library/actors/core/log.h>
namespace NKikimr::NOlap {
-void TPortionMeta::FillBatchInfo(const NArrow::TFirstLastSpecialKeys& primaryKeys, const NArrow::TMinMaxSpecialKeys& snapshotKeys, const TIndexInfo& indexInfo) {
- {
- ReplaceKeyEdges = primaryKeys.BuildAccordingToSchemaVerified(indexInfo.GetReplaceKey());
- IndexKeyStart = ReplaceKeyEdges->GetFirst();
- IndexKeyEnd = ReplaceKeyEdges->GetLast();
- AFL_VERIFY(IndexKeyStart);
- AFL_VERIFY(IndexKeyEnd);
- AFL_VERIFY(*IndexKeyStart <= *IndexKeyEnd)("start", IndexKeyStart->DebugString())("end", IndexKeyEnd->DebugString());
- }
-
- {
- auto cPlanStep = snapshotKeys.GetBatch()->GetColumnByName(TIndexInfo::SPEC_COL_PLAN_STEP);
- auto cTxId = snapshotKeys.GetBatch()->GetColumnByName(TIndexInfo::SPEC_COL_TX_ID);
- Y_ABORT_UNLESS(cPlanStep && cTxId);
- Y_ABORT_UNLESS(cPlanStep->type_id() == arrow::UInt64Type::type_id);
- Y_ABORT_UNLESS(cTxId->type_id() == arrow::UInt64Type::type_id);
- const arrow::UInt64Array& cPlanStepArray = static_cast<const arrow::UInt64Array&>(*cPlanStep);
- const arrow::UInt64Array& cTxIdArray = static_cast<const arrow::UInt64Array&>(*cTxId);
- RecordSnapshotMin = TSnapshot(cPlanStepArray.GetView(0), cTxIdArray.GetView(0));
- RecordSnapshotMax = TSnapshot(cPlanStepArray.GetView(snapshotKeys.GetBatch()->num_rows() - 1), cTxIdArray.GetView(snapshotKeys.GetBatch()->num_rows() - 1));
- }
-}
-
-bool TPortionMeta::DeserializeFromProto(const NKikimrTxColumnShard::TIndexPortionMeta& portionMeta, const TIndexInfo& indexInfo) {
- if (Produced != TPortionMeta::EProduced::UNSPECIFIED) {
- AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("event", "DeserializeFromProto")("error", "parsing duplication");
- return true;
- }
- {
- auto parsed = NStatistics::TPortionStorage::BuildFromProto(portionMeta.GetStatisticsStorage());
- if (!parsed) {
- AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("event", "DeserializeFromProto")("error", parsed.GetErrorMessage());
- return false;
- }
- StatisticsStorage = parsed.DetachResult();
- }
- TierName = portionMeta.GetTierName();
- if (portionMeta.GetIsInserted()) {
- Produced = TPortionMeta::EProduced::INSERTED;
- } else if (portionMeta.GetIsCompacted()) {
- Produced = TPortionMeta::EProduced::COMPACTED;
- } else if (portionMeta.GetIsSplitCompacted()) {
- Produced = TPortionMeta::EProduced::SPLIT_COMPACTED;
- } else if (portionMeta.GetIsEvicted()) {
- Produced = TPortionMeta::EProduced::EVICTED;
- } else {
- AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("event", "DeserializeFromProto")("error", "incorrect portion meta")("meta", portionMeta.DebugString());
- return false;
- }
- Y_ABORT_UNLESS(Produced != TPortionMeta::EProduced::UNSPECIFIED);
-
- if (portionMeta.HasPrimaryKeyBorders()) {
- ReplaceKeyEdges = std::make_shared<NArrow::TFirstLastSpecialKeys>(portionMeta.GetPrimaryKeyBorders(), indexInfo.GetReplaceKey());
- IndexKeyStart = ReplaceKeyEdges->GetFirst();
- IndexKeyEnd = ReplaceKeyEdges->GetLast();
- AFL_VERIFY(IndexKeyStart);
- AFL_VERIFY(IndexKeyEnd);
- AFL_VERIFY (*IndexKeyStart <= *IndexKeyEnd)("start", IndexKeyStart->DebugString())("end", IndexKeyEnd->DebugString());
- }
-
- if (portionMeta.HasRecordSnapshotMin()) {
- RecordSnapshotMin = TSnapshot(portionMeta.GetRecordSnapshotMin().GetPlanStep(), portionMeta.GetRecordSnapshotMin().GetTxId());
- }
- if (portionMeta.HasRecordSnapshotMax()) {
- RecordSnapshotMax = TSnapshot(portionMeta.GetRecordSnapshotMax().GetPlanStep(), portionMeta.GetRecordSnapshotMax().GetTxId());
- }
- return true;
-}
-
NKikimrTxColumnShard::TIndexPortionMeta TPortionMeta::SerializeToProto() const {
NKikimrTxColumnShard::TIndexPortionMeta portionMeta;
portionMeta.SetTierName(TierName);
- *portionMeta.MutableStatisticsStorage() = StatisticsStorage.SerializeToProto();
+ if (!StatisticsStorage.IsEmpty()) {
+ *portionMeta.MutableStatisticsStorage() = StatisticsStorage.SerializeToProto();
+ }
switch (Produced) {
case TPortionMeta::EProduced::UNSPECIFIED:
Y_ABORT_UNLESS(false);
@@ -101,18 +35,10 @@ NKikimrTxColumnShard::TIndexPortionMeta TPortionMeta::SerializeToProto() const {
break;
}
- if (ReplaceKeyEdges) {
- portionMeta.SetPrimaryKeyBorders(ReplaceKeyEdges->SerializeToStringDataOnlyNoCompression());
- }
+ portionMeta.SetPrimaryKeyBorders(ReplaceKeyEdges.SerializeToStringDataOnlyNoCompression());
- if (RecordSnapshotMin) {
- portionMeta.MutableRecordSnapshotMin()->SetPlanStep(RecordSnapshotMin->GetPlanStep());
- portionMeta.MutableRecordSnapshotMin()->SetTxId(RecordSnapshotMin->GetTxId());
- }
- if (RecordSnapshotMax) {
- portionMeta.MutableRecordSnapshotMax()->SetPlanStep(RecordSnapshotMax->GetPlanStep());
- portionMeta.MutableRecordSnapshotMax()->SetTxId(RecordSnapshotMax->GetTxId());
- }
+ RecordSnapshotMin.SerializeToProto(*portionMeta.MutableRecordSnapshotMin());
+ RecordSnapshotMax.SerializeToProto(*portionMeta.MutableRecordSnapshotMax());
return portionMeta;
}
@@ -126,6 +52,14 @@ TString TPortionMeta::DebugString() const {
return sb;
}
+std::optional<TString> TPortionMeta::GetTierNameOptional() const {
+ if (TierName && TierName != NBlobOperations::TGlobal::DefaultStorageId) {
+ return TierName;
+ } else {
+ return std::nullopt;
+ }
+}
+
TString TPortionAddress::DebugString() const {
return TStringBuilder() << "(path_id=" << PathId << ";portion_id=" << PortionId << ")";
}
diff --git a/ydb/core/tx/columnshard/engines/portions/meta.h b/ydb/core/tx/columnshard/engines/portions/meta.h
index 69ee81b114d..4d0dc65b1f0 100644
--- a/ydb/core/tx/columnshard/engines/portions/meta.h
+++ b/ydb/core/tx/columnshard/engines/portions/meta.h
@@ -14,56 +14,42 @@ struct TIndexInfo;
struct TPortionMeta {
private:
- std::shared_ptr<NArrow::TFirstLastSpecialKeys> ReplaceKeyEdges; // first and last PK rows
- YDB_ACCESSOR_DEF(TString, TierName);
+ NArrow::TFirstLastSpecialKeys ReplaceKeyEdges; // first and last PK rows
+ YDB_READONLY_DEF(TString, TierName);
YDB_READONLY_DEF(NStatistics::TPortionStorage, StatisticsStorage);
+ friend class TPortionMetaConstructor;
+ TPortionMeta(NArrow::TFirstLastSpecialKeys& pk, const TSnapshot& min, const TSnapshot& max)
+ : ReplaceKeyEdges(pk)
+ , IndexKeyStart(pk.GetFirst())
+ , IndexKeyEnd(pk.GetLast())
+ , RecordSnapshotMin(min)
+ , RecordSnapshotMax(max)
+ {
+ AFL_VERIFY(IndexKeyStart <= IndexKeyEnd)("start", IndexKeyStart.DebugString())("end", IndexKeyEnd.DebugString());
+ }
public:
using EProduced = NPortion::EProduced;
- std::optional<NArrow::TReplaceKey> IndexKeyStart;
- std::optional<NArrow::TReplaceKey> IndexKeyEnd;
+ NArrow::TReplaceKey IndexKeyStart;
+ NArrow::TReplaceKey IndexKeyEnd;
- std::optional<TSnapshot> RecordSnapshotMin;
- std::optional<TSnapshot> RecordSnapshotMax;
- EProduced Produced{EProduced::UNSPECIFIED};
-
- ui64 GetMetadataMemorySize() const {
- return sizeof(TPortionMeta) + ReplaceKeyEdges->GetMemorySize();
- }
+ TSnapshot RecordSnapshotMin;
+ TSnapshot RecordSnapshotMax;
+ EProduced Produced = EProduced::UNSPECIFIED;
- void SetStatisticsStorage(NStatistics::TPortionStorage&& storage) {
- AFL_VERIFY(StatisticsStorage.IsEmpty());
- StatisticsStorage = std::move(storage);
- }
+ std::optional<TString> GetTierNameOptional() const;
- void ResetStatisticsStorage(NStatistics::TPortionStorage&& storage) {
- StatisticsStorage = std::move(storage);
+ ui64 GetMetadataMemorySize() const {
+ return sizeof(TPortionMeta) + ReplaceKeyEdges.GetMemorySize();
}
- bool DeserializeFromProto(const NKikimrTxColumnShard::TIndexPortionMeta& portionMeta, const TIndexInfo& indexInfo);
-
NKikimrTxColumnShard::TIndexPortionMeta SerializeToProto() const;
- void FillBatchInfo(const NArrow::TFirstLastSpecialKeys& primaryKeys, const NArrow::TMinMaxSpecialKeys& snapshotKeys, const TIndexInfo& indexInfo);
-
EProduced GetProduced() const {
return Produced;
}
TString DebugString() const;
-
- friend IOutputStream& operator << (IOutputStream& out, const TPortionMeta& info) {
- out << info.DebugString();
- return out;
- }
-
- bool HasSnapshotMinMax() const {
- return !!RecordSnapshotMax && !!RecordSnapshotMin;
- }
-
- bool HasPrimaryKeyBorders() const {
- return !!IndexKeyStart && !!IndexKeyEnd;
- }
};
class TPortionAddress {
diff --git a/ydb/core/tx/columnshard/engines/portions/portion_info.cpp b/ydb/core/tx/columnshard/engines/portions/portion_info.cpp
index d717f03c60e..0466bdac04b 100644
--- a/ydb/core/tx/columnshard/engines/portions/portion_info.cpp
+++ b/ydb/core/tx/columnshard/engines/portions/portion_info.cpp
@@ -1,4 +1,5 @@
#include "portion_info.h"
+#include "constructor.h"
#include <ydb/core/tx/columnshard/blobs_reader/task.h>
#include <ydb/core/tx/columnshard/data_sharing/protos/data.pb.h>
#include <ydb/core/tx/columnshard/engines/column_engine.h>
@@ -14,40 +15,6 @@
namespace NKikimr::NOlap {
-const TColumnRecord& TPortionInfo::AppendOneChunkColumn(TColumnRecord&& record) {
- Y_ABORT_UNLESS(record.ColumnId);
- std::optional<ui32> maxChunk;
- for (auto&& i : Records) {
- if (i.ColumnId == record.ColumnId) {
- if (!maxChunk) {
- maxChunk = i.Chunk;
- } else {
- Y_ABORT_UNLESS(*maxChunk + 1 == i.Chunk);
- maxChunk = i.Chunk;
- }
- }
- }
- if (maxChunk) {
- AFL_VERIFY(*maxChunk + 1 == record.Chunk)("max", *maxChunk)("record", record.Chunk);
- } else {
- AFL_VERIFY(0 == record.Chunk)("record", record.Chunk);
- }
- Records.emplace_back(std::move(record));
- return Records.back();
-}
-
-void TPortionInfo::AddMetadata(const ISnapshotSchema& snapshotSchema, const std::shared_ptr<arrow::RecordBatch>& batch, const TString& tierName) {
- Y_ABORT_UNLESS(batch->num_rows() == NumRows());
- AddMetadata(snapshotSchema, NArrow::TFirstLastSpecialKeys(NArrow::ExtractColumns(batch, snapshotSchema.GetIndexInfo().GetReplaceKey())),
- NArrow::TMinMaxSpecialKeys(batch, TIndexInfo::ArrowSchemaSnapshot()), tierName);
-}
-
-void TPortionInfo::AddMetadata(const ISnapshotSchema& snapshotSchema, const NArrow::TFirstLastSpecialKeys& primaryKeys, const NArrow::TMinMaxSpecialKeys& snapshotKeys, const TString& tierName) {
- const auto& indexInfo = snapshotSchema.GetIndexInfo();
- Meta.FillBatchInfo(primaryKeys, snapshotKeys, indexInfo);
- Meta.SetTierName(tierName);
-}
-
std::shared_ptr<arrow::Scalar> TPortionInfo::MaxValue(ui32 columnId) const {
std::shared_ptr<arrow::Scalar> result;
for (auto&& i : Records) {
@@ -130,18 +97,6 @@ TString TPortionInfo::DebugString(const bool withDetails) const {
return sb << ")";
}
-void TPortionInfo::LoadMetadata(const NKikimrTxColumnShard::TIndexPortionMeta& portionMeta, const TIndexInfo& indexInfo) {
- AFL_VERIFY(Meta.DeserializeFromProto(portionMeta, indexInfo));
-}
-
-void TPortionInfo::AddRecord(const TIndexInfo& indexInfo, const TColumnRecord& rec, const NKikimrTxColumnShard::TIndexPortionMeta* portionMeta) {
- Records.push_back(rec);
-
- if (portionMeta) {
- AFL_VERIFY(Meta.DeserializeFromProto(*portionMeta, indexInfo));
- }
-}
-
std::vector<const NKikimr::NOlap::TColumnRecord*> TPortionInfo::GetColumnChunksPointers(const ui32 columnId) const {
std::vector<const TColumnRecord*> result;
for (auto&& c : Records) {
@@ -154,21 +109,6 @@ std::vector<const NKikimr::NOlap::TColumnRecord*> TPortionInfo::GetColumnChunksP
return result;
}
-bool TPortionInfo::IsEqualWithSnapshots(const TPortionInfo& item) const {
- return PathId == item.PathId && MinSnapshotDeprecated == item.MinSnapshotDeprecated
- && Portion == item.Portion && RemoveSnapshot == item.RemoveSnapshot;
-}
-
-bool TPortionInfo::TakeSnapshots(const TPortionInfo& item) {
- if (MinSnapshotDeprecated.Valid()) {
- return PathId == item.PathId && MinSnapshotDeprecated == item.MinSnapshotDeprecated
- && Portion == item.Portion && RemoveSnapshot == item.RemoveSnapshot;
- } else {
- MinSnapshotDeprecated = item.MinSnapshotDeprecated;
- return PathId == item.PathId && Portion == item.Portion && RemoveSnapshot == item.RemoveSnapshot;
- }
-}
-
void TPortionInfo::RemoveFromDatabase(IDbWrapper& db) const {
for (auto& record : Records) {
db.EraseColumn(*this, record);
@@ -316,9 +256,6 @@ TConclusionStatus TPortionInfo::DeserializeFromProto(const NKikimrColumnShardDat
return parse;
}
}
- if (!Meta.DeserializeFromProto(proto.GetMeta(), info)) {
- return TConclusionStatus::Fail("cannot parse meta");
- }
for (auto&& i : proto.GetRecords()) {
auto parse = TColumnRecord::BuildFromProto(i, info.GetColumnFeaturesVerified(i.GetColumnId()));
if (!parse) {
@@ -337,7 +274,11 @@ TConclusionStatus TPortionInfo::DeserializeFromProto(const NKikimrColumnShardDat
}
TConclusion<TPortionInfo> TPortionInfo::BuildFromProto(const NKikimrColumnShardDataSharingProto::TPortionInfo& proto, const TIndexInfo& info) {
- TPortionInfo result;
+ TPortionMetaConstructor constructor;
+ if (!constructor.LoadMetadata(proto.GetMeta(), info)) {
+ return TConclusionStatus::Fail("cannot parse meta");
+ }
+ TPortionInfo result(constructor.Build());
auto parse = result.DeserializeFromProto(proto, info);
if (!parse) {
return parse;
@@ -466,19 +407,6 @@ void TPortionInfo::FillBlobIdsByStorage(THashMap<TString, THashSet<TUnifiedBlobI
return FillBlobIdsByStorage(result, schema->GetIndexInfo());
}
-TBlobRangeLink16::TLinkId TPortionInfo::RegisterBlobId(const TUnifiedBlobId& blobId) {
- AFL_VERIFY(blobId.IsValid());
- TBlobRangeLink16::TLinkId idx = 0;
- for (auto&& i : BlobIds) {
- if (i == blobId) {
- return idx;
- }
- ++idx;
- }
- BlobIds.emplace_back(blobId);
- return idx;
-}
-
THashMap<TString, THashMap<TUnifiedBlobId, std::vector<std::shared_ptr<IPortionDataChunk>>>> TPortionInfo::RestoreEntityChunks(NBlobOperations::NRead::TCompositeReadBlobs& blobs, const TIndexInfo& indexInfo) const {
THashMap<TString, THashMap<TUnifiedBlobId, std::vector<std::shared_ptr<IPortionDataChunk>>>> result;
for (auto&& c : GetRecords()) {
@@ -719,6 +647,15 @@ public:
};
}
+ISnapshotSchema::TPtr TPortionInfo::TSchemaCursor::GetSchema(const TPortionInfoConstructor& portion) {
+ if (!CurrentSchema || portion.GetMinSnapshotDeprecatedVerified() != LastSnapshot) {
+ CurrentSchema = portion.GetSchema(VersionedIndex);
+ LastSnapshot = portion.GetMinSnapshotDeprecatedVerified();
+ }
+ AFL_VERIFY(!!CurrentSchema);
+ return CurrentSchema;
+}
+
NArrow::NAccessor::IChunkedArray::TCurrentChunkAddress TDeserializeChunkedArray::DoGetChunk(const std::optional<TCurrentChunkAddress>& chunkCurrent, const ui64 position) const {
TChunkAccessor accessor(Chunks, Loader);
return SelectChunk(chunkCurrent, position, accessor);
diff --git a/ydb/core/tx/columnshard/engines/portions/portion_info.h b/ydb/core/tx/columnshard/engines/portions/portion_info.h
index 86c5617f067..966d6363941 100644
--- a/ydb/core/tx/columnshard/engines/portions/portion_info.h
+++ b/ydb/core/tx/columnshard/engines/portions/portion_info.h
@@ -22,6 +22,7 @@ namespace NKikimr::NOlap {
namespace NBlobOperations::NRead {
class TCompositeReadBlobs;
}
+class TPortionInfoConstructor;
struct TIndexInfo;
class TVersionedIndex;
@@ -99,6 +100,8 @@ public:
}
};
+class TPortionInfoConstructor;
+
class TPortionInfo {
public:
using TRuntimeFeatures = ui8;
@@ -106,7 +109,12 @@ public:
Optimized = 1 /* "optimized" */
};
private:
- TPortionInfo() = default;
+ friend class TPortionInfoConstructor;
+ TPortionInfo(TPortionMeta&& meta)
+ : Meta(std::move(meta))
+ {
+
+ }
ui64 PathId = 0;
ui64 Portion = 0; // Id of independent (overlayed by PK) portion of data in pathId
TSnapshot MinSnapshotDeprecated = TSnapshot::Zero(); // {PlanStep, TxId} is min snapshot for {Granule, Portion}
@@ -114,7 +122,6 @@ private:
std::optional<ui64> SchemaVersion;
TPortionMeta Meta;
- ui64 DeprecatedGranuleId = 0;
YDB_READONLY_DEF(std::vector<TIndexChunk>, Indexes);
YDB_READONLY(TRuntimeFeatures, RuntimeFeatures, 0);
std::vector<TUnifiedBlobId> BlobIds;
@@ -165,6 +172,15 @@ private:
public:
ui64 GetMinMemoryForReadColumns(const std::optional<std::set<ui32>>& columnIds) const;
+ void SetRemoveSnapshot(const TSnapshot& snap) {
+ AFL_VERIFY(!RemoveSnapshot.Valid());
+ RemoveSnapshot = snap;
+ }
+
+ void SetRemoveSnapshot(const ui64 planStep, const ui64 txId) {
+ SetRemoveSnapshot(TSnapshot(planStep, txId));
+ }
+
void InitRuntimeFeature(const ERuntimeFeature feature, const bool activity) {
if (activity) {
AddRuntimeFeature(feature);
@@ -181,11 +197,6 @@ public:
RuntimeFeatures &= (Max<TRuntimeFeatures>() - (TRuntimeFeatures)feature);
}
- void OnAfterLoad() const {
- CheckChunksOrder(Records);
- CheckChunksOrder(Indexes);
- }
-
bool HasRuntimeFeature(const ERuntimeFeature feature) const {
if (feature == ERuntimeFeature::Optimized) {
if ((RuntimeFeatures & (TRuntimeFeatures)feature)) {
@@ -230,10 +241,6 @@ public:
THashMap<TChunkAddress, TString> DecodeBlobAddresses(NBlobOperations::NRead::TCompositeReadBlobs&& blobs, const TIndexInfo& indexInfo) const;
- void SetStatisticsStorage(NStatistics::TPortionStorage&& storage) {
- Meta.SetStatisticsStorage(std::move(storage));
- }
-
const TString& GetColumnStorageId(const ui32 columnId, const TIndexInfo& indexInfo) const;
const TString& GetEntityStorageId(const ui32 entityId, const TIndexInfo& indexInfo) const;
@@ -277,40 +284,10 @@ public:
return PathId;
}
- TBlobRangeLink16::TLinkId RegisterBlobId(const TUnifiedBlobId& blobId);
-
- void RegisterBlobIdx(const TChunkAddress& address, const TBlobRangeLink16::TLinkId blobIdx) {
- for (auto it = Records.begin(); it != Records.end(); ++it) {
- if (it->ColumnId == address.GetEntityId() && it->Chunk == address.GetChunkIdx()) {
- it->RegisterBlobIdx(blobIdx);
- return;
- }
- }
- for (auto it = Indexes.begin(); it != Indexes.end(); ++it) {
- if (it->GetIndexId() == address.GetEntityId() && it->GetChunkIdx() == address.GetChunkIdx()) {
- it->RegisterBlobIdx(blobIdx);
- return;
- }
- }
- AFL_VERIFY(false)("problem", "portion haven't address for blob registration")("address", address.DebugString());
- }
-
void RemoveFromDatabase(IDbWrapper& db) const;
void SaveToDatabase(IDbWrapper& db, const ui32 firstPKColumnId, const bool saveOnlyMeta) const;
- void AddIndex(const TIndexChunk& chunk) {
- ui32 chunkIdx = 0;
- for (auto&& i : Indexes) {
- if (i.GetIndexId() == chunk.GetIndexId()) {
- AFL_VERIFY(chunkIdx == i.GetChunkIdx())("index_id", chunk.GetIndexId())("expected", chunkIdx)("real", i.GetChunkIdx());
- ++chunkIdx;
- }
- }
- AFL_VERIFY(chunkIdx == chunk.GetChunkIdx())("index_id", chunk.GetIndexId())("expected", chunkIdx)("real", chunk.GetChunkIdx());
- Indexes.emplace_back(chunk);
- }
-
bool OlderThen(const TPortionInfo& info) const {
return RecordSnapshotMin() < info.RecordSnapshotMin();
}
@@ -376,10 +353,6 @@ public:
return result;
}
- void ResetMeta() {
- Meta = TPortionMeta();
- }
-
const TPortionMeta& GetMeta() const {
return Meta;
}
@@ -427,7 +400,7 @@ public:
bool Empty() const { return Records.empty(); }
bool Produced() const { return Meta.GetProduced() != TPortionMeta::EProduced::UNSPECIFIED; }
- bool Valid() const { return ValidSnapshotInfo() && !Empty() && Produced() && Meta.IndexKeyStart && Meta.IndexKeyEnd; }
+ bool Valid() const { return ValidSnapshotInfo() && !Empty() && Produced(); }
bool ValidSnapshotInfo() const { return MinSnapshotDeprecated.Valid() && PathId && Portion; }
bool IsInserted() const { return Meta.GetProduced() == TPortionMeta::EProduced::INSERTED; }
bool IsEvicted() const { return Meta.GetProduced() == TPortionMeta::EProduced::EVICTED; }
@@ -435,28 +408,6 @@ public:
bool CanIntersectOthers() const { return !Valid() || IsInserted() || IsEvicted(); }
size_t NumChunks() const { return Records.size(); }
- TPortionInfo CopyBeforeChunksRebuild() const {
- TPortionInfo result = *this;
- result.Records.clear();
- result.Indexes.clear();
- result.BlobIds.clear();
- return result;
- }
-
- bool IsEqualWithSnapshots(const TPortionInfo& item) const;
- bool TakeSnapshots(const TPortionInfo& item);
-
- static TPortionInfo BuildEmpty() {
- return TPortionInfo();
- }
-
- TPortionInfo(const ui64 pathId, const ui64 portionId, const ui64 schemaVersion, const TSnapshot& minSnapshot)
- : PathId(pathId)
- , Portion(portionId)
- , MinSnapshotDeprecated(minSnapshot)
- , SchemaVersion(schemaVersion) {
- }
-
TString DebugString(const bool withDetails = false) const;
bool HasRemoveSnapshot() const {
@@ -479,19 +430,10 @@ public:
return HasRemoveSnapshot();
}
- bool AllowEarlyFilter() const {
- return Meta.GetProduced() == TPortionMeta::EProduced::COMPACTED
- || Meta.GetProduced() == TPortionMeta::EProduced::SPLIT_COMPACTED;
- }
-
ui64 GetPortion() const {
return Portion;
}
- ui64 GetDeprecatedGranuleId() const {
- return DeprecatedGranuleId;
- }
-
TPortionAddress GetAddress() const {
return TPortionAddress(PathId, Portion);
}
@@ -504,9 +446,6 @@ public:
Portion = portion;
}
- void SetDeprecatedGranuleId(const ui64 granuleId) {
- DeprecatedGranuleId = granuleId;
- }
const TSnapshot& GetMinSnapshotDeprecated() const {
return MinSnapshotDeprecated;
@@ -530,26 +469,8 @@ public:
return SchemaVersion.value();
}
- void SetMinSnapshotDeprecated(const TSnapshot& snap) {
- Y_ABORT_UNLESS(snap.Valid());
- MinSnapshotDeprecated = snap;
- }
-
- void SetSchemaVersion(const ui64 version) {
- AFL_VERIFY(version);
- SchemaVersion = version;
- }
-
- void SetRemoveSnapshot(const TSnapshot& snap) {
- const bool wasValid = RemoveSnapshot.Valid();
- Y_ABORT_UNLESS(!wasValid || snap.Valid());
- RemoveSnapshot = snap;
- }
-
- void SetRemoveSnapshot(const ui64 planStep, const ui64 txId) {
- const bool wasValid = RemoveSnapshot.Valid();
- RemoveSnapshot = TSnapshot(planStep, txId);
- Y_ABORT_UNLESS(!wasValid || RemoveSnapshot.Valid());
+ std::optional<ui64> GetSchemaVersionOptional() const {
+ return SchemaVersion;
}
bool IsVisible(const TSnapshot& snapshot) const {
@@ -566,38 +487,22 @@ public:
return visible;
}
- void UpdateRecordsMeta(TPortionMeta::EProduced produced) {
- Meta.Produced = produced;
- }
-
- void AddRecord(const TIndexInfo& indexInfo, const TColumnRecord& rec, const NKikimrTxColumnShard::TIndexPortionMeta* portionMeta);
-
- void LoadMetadata(const NKikimrTxColumnShard::TIndexPortionMeta& portionMeta, const TIndexInfo& indexInfo);
- void AddMetadata(const ISnapshotSchema& snapshotSchema, const std::shared_ptr<arrow::RecordBatch>& batch,
- const TString& tierName);
- void AddMetadata(const ISnapshotSchema& snapshotSchema, const NArrow::TFirstLastSpecialKeys& primaryKeys, const NArrow::TMinMaxSpecialKeys& snapshotKeys,
- const TString& tierName);
-
std::shared_ptr<arrow::Scalar> MaxValue(ui32 columnId) const;
const NArrow::TReplaceKey& IndexKeyStart() const {
- Y_ABORT_UNLESS(Meta.IndexKeyStart);
- return *Meta.IndexKeyStart;
+ return Meta.IndexKeyStart;
}
const NArrow::TReplaceKey& IndexKeyEnd() const {
- Y_ABORT_UNLESS(Meta.IndexKeyEnd);
- return *Meta.IndexKeyEnd;
+ return Meta.IndexKeyEnd;
}
const TSnapshot& RecordSnapshotMin() const {
- Y_ABORT_UNLESS(Meta.RecordSnapshotMin);
- return *Meta.RecordSnapshotMin;
+ return Meta.RecordSnapshotMin;
}
const TSnapshot& RecordSnapshotMax() const {
- Y_ABORT_UNLESS(Meta.RecordSnapshotMax);
- return *Meta.RecordSnapshotMax;
+ return Meta.RecordSnapshotMax;
}
@@ -616,10 +521,12 @@ public:
: VersionedIndex(versionedIndex)
{}
+ ISnapshotSchema::TPtr GetSchema(const TPortionInfoConstructor& portion);
+
ISnapshotSchema::TPtr GetSchema(const TPortionInfo& portion) {
if (!CurrentSchema || portion.MinSnapshotDeprecated != LastSnapshot) {
CurrentSchema = portion.GetSchema(VersionedIndex);
- LastSnapshot = portion.GetMinSnapshotDeprecated();
+ LastSnapshot = portion.MinSnapshotDeprecated;
}
AFL_VERIFY(!!CurrentSchema)("portion", portion.DebugString());
return CurrentSchema;
@@ -881,8 +788,6 @@ public:
return batch;
}
- const TColumnRecord& AppendOneChunkColumn(TColumnRecord&& record);
-
friend IOutputStream& operator << (IOutputStream& out, const TPortionInfo& info) {
out << info.DebugString();
return out;
diff --git a/ydb/core/tx/columnshard/engines/portions/read_with_blobs.cpp b/ydb/core/tx/columnshard/engines/portions/read_with_blobs.cpp
index 09b98597378..3c7e700e7b4 100644
--- a/ydb/core/tx/columnshard/engines/portions/read_with_blobs.cpp
+++ b/ydb/core/tx/columnshard/engines/portions/read_with_blobs.cpp
@@ -1,11 +1,9 @@
#include "read_with_blobs.h"
#include "write_with_blobs.h"
#include <ydb/core/tx/columnshard/engines/scheme/index_info.h>
-#include <ydb/core/tx/columnshard/engines/scheme/filtered_scheme.h>
-#include <ydb/core/tx/columnshard/engines/column_engine.h>
-#include <ydb/core/tx/columnshard/blobs_reader/task.h>
-#include <ydb/core/tx/columnshard/splitter/batch_slice.h>
+#include <ydb/core/tx/columnshard/engines/scheme/versions/filtered_scheme.h>
#include <ydb/core/tx/columnshard/hooks/abstract/abstract.h>
+#include <ydb/core/tx/columnshard/splitter/batch_slice.h>
namespace NKikimr::NOlap {
@@ -151,10 +149,10 @@ std::optional<TWritePortionInfoWithBlobs> TReadPortionInfoWithBlobs::SyncPortion
auto schemaTo = std::make_shared<TDefaultSchemaDetails>(to, std::make_shared<TSerializationStats>());
TGeneralSerializedSlice slice(entityChunksNew, schemaTo, counters);
const NSplitter::TEntityGroups groups = to->GetIndexInfo().GetEntityGroupsByStorageId(targetTier, *storages);
- TWritePortionInfoWithBlobs result = TWritePortionInfoWithBlobs::BuildByBlobs(slice.GroupChunksByBlobs(groups), source.PortionInfo, storages);
- result.GetPortionInfo().SetMinSnapshotDeprecated(to->GetSnapshot());
- result.GetPortionInfo().SetSchemaVersion(to->GetVersion());
- result.GetPortionInfo().MutableMeta().SetTierName(targetTier);
+ TPortionInfoConstructor constructor(source.PortionInfo, false, true);
+ constructor.SetMinSnapshotDeprecated(to->GetSnapshot());
+ constructor.SetSchemaVersion(to->GetVersion());
+ constructor.MutableMeta().ResetTierName(targetTier);
NStatistics::TPortionStorage storage;
for (auto&& i : to->GetIndexInfo().GetStatisticsByName()) {
@@ -165,7 +163,9 @@ std::optional<TWritePortionInfoWithBlobs> TReadPortionInfoWithBlobs::SyncPortion
i.second->FillStatisticsData(entityChunksNew, storage, to->GetIndexInfo());
}
}
- result.MutablePortionInfo().MutableMeta().ResetStatisticsStorage(std::move(storage));
+ constructor.MutableMeta().ResetStatisticsStorage(std::move(storage));
+
+ TWritePortionInfoWithBlobs result = TWritePortionInfoWithBlobs::BuildByBlobs(slice.GroupChunksByBlobs(groups), std::move(constructor), storages);
return result;
}
diff --git a/ydb/core/tx/columnshard/engines/portions/read_with_blobs.h b/ydb/core/tx/columnshard/engines/portions/read_with_blobs.h
index 2c8553f9884..6d688db6607 100644
--- a/ydb/core/tx/columnshard/engines/portions/read_with_blobs.h
+++ b/ydb/core/tx/columnshard/engines/portions/read_with_blobs.h
@@ -1,13 +1,13 @@
#pragma once
#include "base_with_blobs.h"
+#include "common.h"
#include "portion_info.h"
-#include <ydb/core/tx/columnshard/blob.h>
-#include <ydb/core/tx/columnshard/splitter/blob_info.h>
-#include <ydb/core/tx/columnshard/splitter/chunks.h>
-#include <ydb/core/tx/columnshard/engines/scheme/statistics/abstract/common.h>
-#include <ydb/core/tx/columnshard/engines/scheme/statistics/abstract/operator.h>
+
+#include <ydb/core/tx/columnshard/splitter/abstract/chunks.h>
#include <ydb/library/accessor/accessor.h>
+#include <contrib/libs/apache/arrow/cpp/src/arrow/record_batch.h>
+#include <map>
namespace NKikimr::NOlap {
diff --git a/ydb/core/tx/columnshard/engines/portions/write_with_blobs.cpp b/ydb/core/tx/columnshard/engines/portions/write_with_blobs.cpp
index 99929745437..4fbc7cb3305 100644
--- a/ydb/core/tx/columnshard/engines/portions/write_with_blobs.cpp
+++ b/ydb/core/tx/columnshard/engines/portions/write_with_blobs.cpp
@@ -1,10 +1,5 @@
#include "write_with_blobs.h"
#include <ydb/core/tx/columnshard/engines/scheme/index_info.h>
-#include <ydb/core/tx/columnshard/engines/scheme/filtered_scheme.h>
-#include <ydb/core/tx/columnshard/engines/column_engine.h>
-#include <ydb/core/tx/columnshard/blobs_reader/task.h>
-#include <ydb/core/tx/columnshard/splitter/batch_slice.h>
-#include <ydb/core/tx/columnshard/hooks/abstract/abstract.h>
namespace NKikimr::NOlap {
@@ -19,25 +14,27 @@ void TWritePortionInfoWithBlobs::TBlobInfo::AddChunk(TWritePortionInfoWithBlobs&
Y_ABORT_UNLESS(Chunks.emplace(chunk->GetChunkAddressVerified(), chunk).second);
ChunksOrdered.emplace_back(chunk);
- chunk->AddIntoPortionBeforeBlob(bRange, owner.PortionInfo);
+ chunk->AddIntoPortionBeforeBlob(bRange, owner.GetPortionConstructor());
}
void TWritePortionInfoWithBlobs::TBlobInfo::RegisterBlobId(TWritePortionInfoWithBlobs& owner, const TUnifiedBlobId& blobId) {
- const TBlobRangeLink16::TLinkId idx = owner.PortionInfo.RegisterBlobId(blobId);
+ const TBlobRangeLink16::TLinkId idx = owner.GetPortionConstructor().RegisterBlobId(blobId);
for (auto&& i : Chunks) {
- owner.PortionInfo.RegisterBlobIdx(i.first, idx);
+ owner.GetPortionConstructor().RegisterBlobIdx(i.first, idx);
}
}
TWritePortionInfoWithBlobs TWritePortionInfoWithBlobs::BuildByBlobs(std::vector<TSplittedBlob>&& chunks,
const ui64 granule, const ui64 schemaVersion, const TSnapshot& snapshot, const std::shared_ptr<IStoragesManager>& operators)
{
- return BuildByBlobs(std::move(chunks), TPortionInfo(granule, 0, schemaVersion, snapshot), operators);
+ TPortionInfoConstructor constructor(granule);
+ constructor.SetMinSnapshotDeprecated(snapshot);
+ constructor.SetSchemaVersion(schemaVersion);
+ return BuildByBlobs(std::move(chunks), std::move(constructor), operators);
}
-TWritePortionInfoWithBlobs TWritePortionInfoWithBlobs::BuildByBlobs(std::vector<TSplittedBlob>&& chunks, const TPortionInfo& basePortion,
- const std::shared_ptr<IStoragesManager>& operators) {
- TWritePortionInfoWithBlobs result(basePortion.CopyBeforeChunksRebuild());
+TWritePortionInfoWithBlobs TWritePortionInfoWithBlobs::BuildByBlobs(std::vector<TSplittedBlob>&& chunks, TPortionInfoConstructor&& constructor, const std::shared_ptr<IStoragesManager>& operators) {
+ TWritePortionInfoWithBlobs result(std::move(constructor));
for (auto&& blob : chunks) {
auto storage = operators->GetOperatorVerified(blob.GetGroupName());
auto blobInfo = result.StartBlob(storage);
@@ -45,7 +42,6 @@ TWritePortionInfoWithBlobs TWritePortionInfoWithBlobs::BuildByBlobs(std::vector<
blobInfo.AddChunk(chunk);
}
}
- result.GetPortionInfo().ReorderChunks();
return result;
}
@@ -75,7 +71,7 @@ void TWritePortionInfoWithBlobs::FillStatistics(const TIndexInfo& index) {
}
i.second->FillStatisticsData(data, storage, index);
}
- PortionInfo.SetStatisticsStorage(std::move(storage));
+ GetPortionConstructor().MutableMeta().SetStatisticsStorage(std::move(storage));
}
}
diff --git a/ydb/core/tx/columnshard/engines/portions/write_with_blobs.h b/ydb/core/tx/columnshard/engines/portions/write_with_blobs.h
index c84d97bd06c..39c3bb885fe 100644
--- a/ydb/core/tx/columnshard/engines/portions/write_with_blobs.h
+++ b/ydb/core/tx/columnshard/engines/portions/write_with_blobs.h
@@ -1,13 +1,11 @@
#pragma once
#include "base_with_blobs.h"
-#include "portion_info.h"
-#include <ydb/core/tx/columnshard/blob.h>
-#include <ydb/core/tx/columnshard/splitter/blob_info.h>
-#include <ydb/core/tx/columnshard/splitter/chunks.h>
-#include <ydb/core/tx/columnshard/engines/scheme/statistics/abstract/common.h>
-#include <ydb/core/tx/columnshard/engines/scheme/statistics/abstract/operator.h>
+#include "constructor.h"
#include <ydb/library/accessor/accessor.h>
+#include <ydb/core/tx/columnshard/blobs_action/abstract/storages_manager.h>
+#include <ydb/core/tx/columnshard/common/snapshot.h>
+#include <ydb/core/tx/columnshard/splitter/blob_info.h>
namespace NKikimr::NOlap {
@@ -63,15 +61,12 @@ public:
void RegisterBlobId(TWritePortionInfoWithBlobs& owner, const TUnifiedBlobId& blobId);
};
private:
- TPortionInfo PortionInfo;
+ std::optional<TPortionInfoConstructor> PortionConstructor;
+ std::optional<TPortionInfo> PortionResult;
YDB_READONLY_DEF(std::vector<TBlobInfo>, Blobs);
- explicit TWritePortionInfoWithBlobs(TPortionInfo&& portionInfo)
- : PortionInfo(std::move(portionInfo)) {
- }
-
- explicit TWritePortionInfoWithBlobs(const TPortionInfo& portionInfo)
- : PortionInfo(portionInfo) {
+ explicit TWritePortionInfoWithBlobs(TPortionInfoConstructor&& portionConstructor)
+ : PortionConstructor(std::move(portionConstructor)) {
}
TBlobInfo::TBuilder StartBlob(const std::shared_ptr<IBlobsStorageOperator>& bOperator) {
@@ -80,10 +75,6 @@ private:
}
public:
- TPortionInfo& MutablePortionInfo() {
- return PortionInfo;
- }
-
std::vector<std::shared_ptr<IPortionDataChunk>> GetEntityChunks(const ui32 entityId) const;
void FillStatistics(const TIndexInfo& index);
@@ -91,8 +82,8 @@ public:
static TWritePortionInfoWithBlobs BuildByBlobs(std::vector<TSplittedBlob>&& chunks,
const ui64 granule, const ui64 schemaVersion, const TSnapshot& snapshot, const std::shared_ptr<IStoragesManager>& operators);
- static TWritePortionInfoWithBlobs BuildByBlobs(std::vector<TSplittedBlob>&& chunks, const TPortionInfo& basePortion,
- const std::shared_ptr<IStoragesManager>& operators);
+ static TWritePortionInfoWithBlobs BuildByBlobs(std::vector<TSplittedBlob>&& chunks,
+ TPortionInfoConstructor&& constructor, const std::shared_ptr<IStoragesManager>& operators);
const TString& GetBlobByRangeVerified(const ui32 columnId, const ui32 chunkId) const {
for (auto&& b : Blobs) {
@@ -123,21 +114,28 @@ public:
}
TString DebugString() const {
- return TStringBuilder() << PortionInfo.DebugString() << ";blobs_count=" << Blobs.size() << ";";
+ return TStringBuilder() << "blobs_count=" << Blobs.size() << ";";
}
- const TPortionInfo& GetPortionInfo() const {
- return PortionInfo;
+ void FinalizePortionConstructor() {
+ AFL_VERIFY(!!PortionConstructor);
+ AFL_VERIFY(!PortionResult);
+ PortionResult = PortionConstructor->Build(true);
+ PortionConstructor.reset();
}
- TPortionInfo& GetPortionInfo() {
- return PortionInfo;
+ const TPortionInfo& GetPortionResult() const {
+ AFL_VERIFY(!PortionConstructor);
+ AFL_VERIFY(!!PortionResult);
+ return *PortionResult;
}
- friend IOutputStream& operator << (IOutputStream& out, const TWritePortionInfoWithBlobs& info) {
- out << info.DebugString();
- return out;
+ TPortionInfoConstructor& GetPortionConstructor() {
+ AFL_VERIFY(!!PortionConstructor);
+ AFL_VERIFY(!PortionResult);
+ return *PortionConstructor;
}
+
};
} // namespace NKikimr::NOlap
diff --git a/ydb/core/tx/columnshard/engines/portions/ya.make b/ydb/core/tx/columnshard/engines/portions/ya.make
index 7143d9fc5ea..d9f35dbbac8 100644
--- a/ydb/core/tx/columnshard/engines/portions/ya.make
+++ b/ydb/core/tx/columnshard/engines/portions/ya.make
@@ -6,6 +6,8 @@ SRCS(
base_with_blobs.cpp
read_with_blobs.cpp
write_with_blobs.cpp
+ constructor.cpp
+ constructor_meta.cpp
meta.cpp
common.cpp
index_chunk.cpp
diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/constructor/read_metadata.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/constructor/read_metadata.h
index 8be1cf19a97..371a09d7310 100644
--- a/ydb/core/tx/columnshard/engines/reader/plain_reader/constructor/read_metadata.h
+++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/constructor/read_metadata.h
@@ -74,7 +74,8 @@ public:
<< " at snapshot: " << GetRequestSnapshot().DebugString();
TBase::Dump(out);
if (SelectInfo) {
- out << ", " << *SelectInfo;
+ out << ", ";
+ SelectInfo->DebugStream(out);
}
}
diff --git a/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/meta.cpp b/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/meta.cpp
index f645bd4a308..2d7b8390aca 100644
--- a/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/meta.cpp
+++ b/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/meta.cpp
@@ -1,6 +1,7 @@
#include "meta.h"
#include <ydb/core/tx/columnshard/blobs_action/abstract/storages_manager.h>
#include <ydb/core/tx/columnshard/engines/portions/column_record.h>
+#include <ydb/core/tx/columnshard/engines/portions/constructor.h>
#include <ydb/core/tx/columnshard/engines/portions/portion_info.h>
#include <ydb/core/tx/columnshard/engines/scheme/index_info.h>
#include <ydb/core/tx/columnshard/engines/storage/chunks/data.h>
@@ -11,7 +12,7 @@
namespace NKikimr::NOlap::NIndexes {
-void TPortionIndexChunk::DoAddIntoPortionBeforeBlob(const TBlobRangeLink16& bRange, TPortionInfo& portionInfo) const {
+void TPortionIndexChunk::DoAddIntoPortionBeforeBlob(const TBlobRangeLink16& bRange, TPortionInfoConstructor& portionInfo) const {
AFL_VERIFY(!bRange.IsValid());
portionInfo.AddIndex(TIndexChunk(GetEntityId(), GetChunkIdxVerified(), RecordsCount, RawBytes, bRange));
}
diff --git a/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/meta.h b/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/meta.h
index 1f414cdc030..95997072e43 100644
--- a/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/meta.h
+++ b/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/meta.h
@@ -124,7 +124,7 @@ protected:
virtual std::shared_ptr<arrow::Scalar> DoGetLastScalar() const override {
return nullptr;
}
- virtual void DoAddIntoPortionBeforeBlob(const TBlobRangeLink16& bRange, TPortionInfo& portionInfo) const override;
+ virtual void DoAddIntoPortionBeforeBlob(const TBlobRangeLink16& bRange, TPortionInfoConstructor& portionInfo) const override;
public:
TPortionIndexChunk(const TChunkAddress& address, const ui32 recordsCount, const ui64 rawBytes, const TString& data)
: TBase(address.GetColumnId(), address.GetChunkIdx())
diff --git a/ydb/core/tx/columnshard/engines/storage/chunks/data.cpp b/ydb/core/tx/columnshard/engines/storage/chunks/data.cpp
index 373c88a765f..79613b5b979 100644
--- a/ydb/core/tx/columnshard/engines/storage/chunks/data.cpp
+++ b/ydb/core/tx/columnshard/engines/storage/chunks/data.cpp
@@ -1,9 +1,10 @@
#include "data.h"
#include <ydb/core/tx/columnshard/engines/portions/portion_info.h>
+#include <ydb/core/tx/columnshard/engines/portions/constructor.h>
namespace NKikimr::NOlap::NChunks {
-void TPortionIndexChunk::DoAddIntoPortionBeforeBlob(const TBlobRangeLink16& bRange, TPortionInfo& portionInfo) const {
+void TPortionIndexChunk::DoAddIntoPortionBeforeBlob(const TBlobRangeLink16& bRange, TPortionInfoConstructor& portionInfo) const {
AFL_VERIFY(!bRange.IsValid());
portionInfo.AddIndex(TIndexChunk(GetEntityId(), GetChunkIdxVerified(), RecordsCount, RawBytes, bRange));
}
diff --git a/ydb/core/tx/columnshard/engines/storage/chunks/data.h b/ydb/core/tx/columnshard/engines/storage/chunks/data.h
index dfa9189e560..d5a91c19609 100644
--- a/ydb/core/tx/columnshard/engines/storage/chunks/data.h
+++ b/ydb/core/tx/columnshard/engines/storage/chunks/data.h
@@ -31,7 +31,7 @@ protected:
virtual std::shared_ptr<arrow::Scalar> DoGetLastScalar() const override {
return nullptr;
}
- virtual void DoAddIntoPortionBeforeBlob(const TBlobRangeLink16& bRange, TPortionInfo& portionInfo) const override;
+ virtual void DoAddIntoPortionBeforeBlob(const TBlobRangeLink16& bRange, TPortionInfoConstructor& portionInfo) const override;
virtual std::shared_ptr<IPortionDataChunk> DoCopyWithAnotherBlob(TString&& data, const TSimpleColumnInfo& /*columnInfo*/) const override {
return std::make_shared<TPortionIndexChunk>(GetChunkAddressVerified(), RecordsCount, RawBytes, std::move(data));
}
diff --git a/ydb/core/tx/columnshard/engines/storage/granule.cpp b/ydb/core/tx/columnshard/engines/storage/granule.cpp
index e1b6a9b74c1..c9b0d22d517 100644
--- a/ydb/core/tx/columnshard/engines/storage/granule.cpp
+++ b/ydb/core/tx/columnshard/engines/storage/granule.cpp
@@ -43,13 +43,6 @@ bool TGranuleMeta::ErasePortion(const ui64 portion) {
return true;
}
-void TGranuleMeta::AddColumnRecordOnLoad(const TIndexInfo& indexInfo, const TPortionInfo& portion, const TColumnChunkLoadContext& rec, const NKikimrTxColumnShard::TIndexPortionMeta* portionMeta) {
- std::shared_ptr<TPortionInfo> pInfo = UpsertPortionOnLoad(portion);
- TColumnRecord cRecord(pInfo->RegisterBlobId(rec.GetBlobRange().GetBlobId()), rec, indexInfo.GetColumnFeaturesVerified(rec.GetAddress().GetColumnId()));
- AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("event", "AddColumnRecordOnLoad")("portion_info", portion.DebugString())("record", cRecord.DebugString());
- pInfo->AddRecord(indexInfo, cRecord, portionMeta);
-}
-
void TGranuleMeta::OnAfterChangePortion(const std::shared_ptr<TPortionInfo> portionAfter, NStorageOptimizer::IOptimizerPlanner::TModificationGuard* modificationGuard) {
if (portionAfter) {
AFL_VERIFY(PortionsByPK[portionAfter->IndexKeyStart()].emplace(portionAfter->GetPortion(), portionAfter).second);
@@ -161,17 +154,11 @@ bool TGranuleMeta::InCompaction() const {
return Activity.contains(EActivity::GeneralCompaction);
}
-std::shared_ptr<TPortionInfo> TGranuleMeta::UpsertPortionOnLoad(const TPortionInfo& portion) {
- auto it = Portions.find(portion.GetPortion());
- AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("event", "UpsertPortionOnLoad")("portion_info", portion.DebugString());
- if (it == Portions.end()) {
- Y_ABORT_UNLESS(portion.Records.empty());
- auto portionNew = std::make_shared<TPortionInfo>(portion);
- it = Portions.emplace(portion.GetPortion(), portionNew).first;
- } else {
- AFL_VERIFY(it->second->TakeSnapshots(portion))("self", it->second->DebugString())("item", portion.DebugString());
- }
- return it->second;
+std::shared_ptr<TPortionInfo> TGranuleMeta::UpsertPortionOnLoad(TPortionInfo&& portion) {
+ auto portionId = portion.GetPortionId();
+ auto emplaceInfo = Portions.emplace(portionId, std::make_shared<TPortionInfo>(std::move(portion)));
+ AFL_VERIFY(emplaceInfo.second);
+ return emplaceInfo.first->second;
}
} // namespace NKikimr::NOlap
diff --git a/ydb/core/tx/columnshard/engines/storage/granule.h b/ydb/core/tx/columnshard/engines/storage/granule.h
index 7113b69256a..1002139fbf3 100644
--- a/ydb/core/tx/columnshard/engines/storage/granule.h
+++ b/ydb/core/tx/columnshard/engines/storage/granule.h
@@ -218,7 +218,6 @@ public:
void OnAfterPortionsLoad() {
auto g = OptimizerPlanner->StartModificationGuard();
for (auto&& i : Portions) {
- i.second->OnAfterLoad();
OnAfterChangePortion(i.second, &g);
}
}
@@ -272,9 +271,7 @@ public:
;
}
- std::shared_ptr<TPortionInfo> UpsertPortionOnLoad(const TPortionInfo& portion);
-
- void AddColumnRecordOnLoad(const TIndexInfo& indexInfo, const TPortionInfo& portion, const TColumnChunkLoadContext& rec, const NKikimrTxColumnShard::TIndexPortionMeta* portionMeta);
+ std::shared_ptr<TPortionInfo> UpsertPortionOnLoad(TPortionInfo&& portion);
const THashMap<ui64, std::shared_ptr<TPortionInfo>>& GetPortions() const {
return Portions;
diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/abstract/optimizer.h b/ydb/core/tx/columnshard/engines/storage/optimizer/abstract/optimizer.h
index 8090de1dee6..e4c284aea5b 100644
--- a/ydb/core/tx/columnshard/engines/storage/optimizer/abstract/optimizer.h
+++ b/ydb/core/tx/columnshard/engines/storage/optimizer/abstract/optimizer.h
@@ -83,19 +83,11 @@ public:
THashMap<ui64, std::shared_ptr<TPortionInfo>> RemovePortions;
public:
TModificationGuard& AddPortion(const std::shared_ptr<TPortionInfo>& portion) {
- if (HasAppData() && AppDataVerified().ColumnShardConfig.GetSkipOldGranules() && portion->GetDeprecatedGranuleId() > 0) {
- AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "skip_granule")("granule_id", portion->GetDeprecatedGranuleId());
- return *this;
- }
AFL_VERIFY(AddPortions.emplace(portion->GetPortionId(), portion).second);
return*this;
}
TModificationGuard& RemovePortion(const std::shared_ptr<TPortionInfo>& portion) {
- if (HasAppData() && AppDataVerified().ColumnShardConfig.GetSkipOldGranules() && portion->GetDeprecatedGranuleId() > 0) {
- AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "skip_granule")("granule_id", portion->GetDeprecatedGranuleId());
- return *this;
- }
AFL_VERIFY(RemovePortions.emplace(portion->GetPortionId(), portion).second);
return*this;
}
diff --git a/ydb/core/tx/columnshard/engines/ut/ut_insert_table.cpp b/ydb/core/tx/columnshard/engines/ut/ut_insert_table.cpp
index 9ed9f504755..34a570d2de0 100644
--- a/ydb/core/tx/columnshard/engines/ut/ut_insert_table.cpp
+++ b/ydb/core/tx/columnshard/engines/ut/ut_insert_table.cpp
@@ -34,13 +34,13 @@ public:
virtual void ErasePortion(const NOlap::TPortionInfo& /*portion*/) override {
}
- virtual bool LoadPortions(const std::function<void(NOlap::TPortionInfo&&, const NKikimrTxColumnShard::TIndexPortionMeta&)>& /*callback*/) override {
+ virtual bool LoadPortions(const std::function<void(NOlap::TPortionInfoConstructor&&, const NKikimrTxColumnShard::TIndexPortionMeta&)>& /*callback*/) override {
return true;
}
void WriteColumn(const TPortionInfo&, const TColumnRecord&, const ui32 /*firstPKColumnId*/) override {}
void EraseColumn(const TPortionInfo&, const TColumnRecord&) override {}
- bool LoadColumns(const std::function<void(const TPortionInfo&, const TColumnChunkLoadContext&)>&) override { return true; }
+ bool LoadColumns(const std::function<void(NOlap::TPortionInfoConstructor&&, const TColumnChunkLoadContext&)>&) override { return true; }
virtual void WriteIndex(const TPortionInfo& /*portion*/, const TIndexChunk& /*row*/) override {}
virtual void EraseIndex(const TPortionInfo& /*portion*/, const TIndexChunk& /*row*/) override {}
diff --git a/ydb/core/tx/columnshard/engines/ut/ut_logs_engine.cpp b/ydb/core/tx/columnshard/engines/ut/ut_logs_engine.cpp
index 641b8e604b1..e8813bed509 100644
--- a/ydb/core/tx/columnshard/engines/ut/ut_logs_engine.cpp
+++ b/ydb/core/tx/columnshard/engines/ut/ut_logs_engine.cpp
@@ -35,7 +35,7 @@ private:
std::map<TPortionAddress, std::map<TChunkAddress, TColumnChunkLoadContext>> LoadContexts;
public:
struct TIndex {
- THashMap<ui64, THashMap<ui64, TPortionInfo>> Columns; // pathId -> portions
+ THashMap<ui64, THashMap<ui64, TPortionInfoConstructor>> Columns; // pathId -> portions
THashMap<ui32, ui64> Counters;
};
@@ -86,7 +86,7 @@ public:
virtual void ErasePortion(const NOlap::TPortionInfo& /*portion*/) override {
}
- virtual bool LoadPortions(const std::function<void(NOlap::TPortionInfo&&, const NKikimrTxColumnShard::TIndexPortionMeta&)>& /*callback*/) override {
+ virtual bool LoadPortions(const std::function<void(NOlap::TPortionInfoConstructor&&, const NKikimrTxColumnShard::TIndexPortionMeta&)>& /*callback*/) override {
return true;
}
@@ -104,19 +104,21 @@ public:
}
auto it = data.find(portion.GetPortion());
if (it == data.end()) {
- it = data.emplace(portion.GetPortion(), portion.CopyBeforeChunksRebuild()).first;
+ it = data.emplace(portion.GetPortion(), TPortionInfoConstructor(portion, false, true)).first;
} else {
- Y_ABORT_UNLESS(portion.GetPathId() == it->second.GetPathId() && portion.GetPortion() == it->second.GetPortion());
+ Y_ABORT_UNLESS(portion.GetPathId() == it->second.GetPathId() && portion.GetPortion() == it->second.GetPortionIdVerified());
}
it->second.SetMinSnapshotDeprecated(portion.GetMinSnapshotDeprecated());
if (portion.HasRemoveSnapshot()) {
- it->second.SetRemoveSnapshot(portion.GetRemoveSnapshotVerified());
+ if (!it->second.HasRemoveSnapshot()) {
+ it->second.SetRemoveSnapshot(portion.GetRemoveSnapshotVerified());
+ }
} else {
AFL_VERIFY(!it->second.HasRemoveSnapshot());
}
bool replaced = false;
- for (auto& rec : it->second.Records) {
+ for (auto& rec : it->second.MutableRecords()) {
if (rec.IsEqualTest(row)) {
rec = row;
replaced = true;
@@ -124,7 +126,7 @@ public:
}
}
if (!replaced) {
- it->second.Records.push_back(row);
+ it->second.MutableRecords().emplace_back(row);
}
}
@@ -135,26 +137,26 @@ public:
auto& portionLocal = it->second;
std::vector<TColumnRecord> filtered;
- for (auto& rec : portionLocal.Records) {
+ for (auto& rec : portionLocal.GetRecords()) {
if (!rec.IsEqualTest(row)) {
filtered.push_back(rec);
}
}
- portionLocal.Records.swap(filtered);
+ portionLocal.MutableRecords().swap(filtered);
}
- bool LoadColumns(const std::function<void(const TPortionInfo&, const TColumnChunkLoadContext&)>& callback) override {
+ bool LoadColumns(const std::function<void(NOlap::TPortionInfoConstructor&&, const TColumnChunkLoadContext&)>& callback) override {
auto& columns = Indices[0].Columns;
for (auto& [pathId, portions] : columns) {
for (auto& [portionId, portionLocal] : portions) {
auto copy = portionLocal;
- copy.ResetMeta();
- copy.Records.clear();
- for (const auto& rec : portionLocal.Records) {
+ copy.MutableRecords().clear();
+ for (const auto& rec : portionLocal.GetRecords()) {
auto itContextLoader = LoadContexts[copy.GetAddress()].find(rec.GetAddress());
Y_ABORT_UNLESS(itContextLoader != LoadContexts[copy.GetAddress()].end());
- callback(copy, itContextLoader->second);
- LoadContexts[copy.GetAddress()].erase(itContextLoader);
+ auto address = copy.GetAddress();
+ callback(std::move(copy), itContextLoader->second);
+ LoadContexts[address].erase(itContextLoader);
}
}
}
@@ -267,10 +269,10 @@ TString MakeTestBlob(i64 start = 0, i64 end = 100) {
void AddIdsToBlobs(std::vector<TWritePortionInfoWithBlobs>& portions, NBlobOperations::NRead::TCompositeReadBlobs& blobs, ui32& step) {
for (auto& portion : portions) {
- for (auto& rec : portion.GetPortionInfo().Records) {
- rec.BlobRange.BlobIdx = portion.GetPortionInfo().RegisterBlobId(MakeUnifiedBlobId(++step, portion.GetBlobFullSizeVerified(rec.ColumnId, rec.Chunk)));
+ for (auto& rec : portion.GetPortionConstructor().MutableRecords()) {
+ rec.BlobRange.BlobIdx = portion.GetPortionConstructor().RegisterBlobId(MakeUnifiedBlobId(++step, portion.GetBlobFullSizeVerified(rec.ColumnId, rec.Chunk)));
TString data = portion.GetBlobByRangeVerified(rec.ColumnId, rec.Chunk);
- blobs.Add(IStoragesManager::DefaultStorageId, portion.GetPortionInfo().RestoreBlobRange(rec.BlobRange), std::move(data));
+ blobs.Add(IStoragesManager::DefaultStorageId, portion.GetPortionConstructor().RestoreBlobRange(rec.BlobRange), std::move(data));
}
}
}
@@ -303,7 +305,7 @@ bool Insert(TColumnEngineForLogs& engine, TTestDbWrapper& db, TSnapshot snap,
AddIdsToBlobs(changes->AppendedPortions, blobs, step);
- const bool result = engine.ApplyChanges(db, changes, snap);
+ const bool result = engine.ApplyChangesOnTxCreate(changes, snap) && engine.ApplyChangesOnExecute(db, changes, snap);
NOlap::TWriteIndexContext contextExecute(nullptr, db, engine);
changes->WriteIndexOnExecute(nullptr, contextExecute);
@@ -334,15 +336,15 @@ bool Compact(TColumnEngineForLogs& engine, TTestDbWrapper& db, TSnapshot snap, N
// UNIT_ASSERT_VALUES_EQUAL(changes->GetTmpGranuleIds().size(), expected.NewGranules);
- const bool result = engine.ApplyChanges(db, changes, snap);
+ const bool result = engine.ApplyChangesOnTxCreate(changes, snap) && engine.ApplyChangesOnExecute(db, changes, snap);
NOlap::TWriteIndexContext contextExecute(nullptr, db, engine);
changes->WriteIndexOnExecute(nullptr, contextExecute);
NOlap::TWriteIndexCompleteContext contextComplete(NActors::TActivationContext::AsActorContext(), 0, 0, TDuration::Zero(), engine);
changes->WriteIndexOnComplete(nullptr, contextComplete);
if (blobsPool) {
for (auto&& i : changes->AppendedPortions) {
- for (auto&& r : i.GetPortionInfo().Records) {
- Y_ABORT_UNLESS(blobsPool->emplace(i.GetPortionInfo().RestoreBlobRange(r.BlobRange), i.GetBlobByRangeVerified(r.ColumnId, r.Chunk)).second);
+ for (auto&& r : i.GetPortionResult().GetRecords()) {
+ Y_ABORT_UNLESS(blobsPool->emplace(i.GetPortionResult().RestoreBlobRange(r.BlobRange), i.GetBlobByRangeVerified(r.ColumnId, r.Chunk)).second);
}
}
}
@@ -361,7 +363,7 @@ bool Cleanup(TColumnEngineForLogs& engine, TTestDbWrapper& db, TSnapshot snap, u
changes->StartEmergency();
- const bool result = engine.ApplyChanges(db, changes, snap);
+ const bool result = engine.ApplyChangesOnTxCreate(changes, snap) && engine.ApplyChangesOnExecute(db, changes, snap);
NOlap::TWriteIndexContext contextExecute(nullptr, db, engine);
changes->WriteIndexOnExecute(nullptr, contextExecute);
NOlap::TWriteIndexCompleteContext contextComplete(NActors::TActivationContext::AsActorContext(), 0, 0, TDuration::Zero(), engine);
@@ -375,11 +377,11 @@ bool Ttl(TColumnEngineForLogs& engine, TTestDbWrapper& db,
std::vector<std::shared_ptr<TTTLColumnEngineChanges>> vChanges = engine.StartTtl(pathEviction, EmptyDataLocksManager, 512 * 1024 * 1024);
AFL_VERIFY(vChanges.size() == 1)("count", vChanges.size());
auto changes = vChanges.front();
- UNIT_ASSERT_VALUES_EQUAL(changes->PortionsToRemove.size(), expectedToDrop);
+ UNIT_ASSERT_VALUES_EQUAL(changes->GetPortionsToRemove().size(), expectedToDrop);
changes->StartEmergency();
- const bool result = engine.ApplyChanges(db, changes, TSnapshot(1,1));
+ const bool result = engine.ApplyChangesOnTxCreate(changes, TSnapshot(1, 1)) && engine.ApplyChangesOnExecute(db, changes, TSnapshot(1, 1));
NOlap::TWriteIndexContext contextExecute(nullptr, db, engine);
changes->WriteIndexOnExecute(nullptr, contextExecute);
NOlap::TWriteIndexCompleteContext contextComplete(NActors::TActivationContext::AsActorContext(), 0, 0, TDuration::Zero(), engine);
diff --git a/ydb/core/tx/columnshard/normalizer/portion/min_max.cpp b/ydb/core/tx/columnshard/normalizer/portion/min_max.cpp
index 4923592882f..e459ba07086 100644
--- a/ydb/core/tx/columnshard/normalizer/portion/min_max.cpp
+++ b/ydb/core/tx/columnshard/normalizer/portion/min_max.cpp
@@ -2,8 +2,9 @@
#include "normalizer.h"
#include <ydb/core/tx/columnshard/engines/portions/portion_info.h>
-#include <ydb/core/tx/columnshard/tables_manager.h>
#include <ydb/core/tx/columnshard/engines/scheme/filtered_scheme.h>
+#include <ydb/core/tx/columnshard/engines/portions/constructor.h>
+#include <ydb/core/tx/columnshard/tables_manager.h>
#include <ydb/core/formats/arrow/arrow_helpers.h>
@@ -46,7 +47,7 @@ protected:
auto preparedBatch = portionInfo->PrepareForAssemble(**blobSchema, *filteredSchema, blobsDataAssemble);
auto batch = preparedBatch.Assemble();
Y_ABORT_UNLESS(!!batch);
- portionInfo->AddMetadata(**blobSchema, batch, portionInfo->GetMeta().GetTierName());
+// portionInfo->AddMetadata(**blobSchema, batch, portionInfo->GetMeta().GetTierName());
}
auto changes = std::make_shared<TPortionsNormalizer::TNormalizerResult>(std::move(Portions), std::move(firstPKColumnIdByPathId));
@@ -78,10 +79,7 @@ public:
return portion->GetTotalRawBytes();
}
- static bool CheckPortion(const TPortionInfo& portionInfo) {
- if (!portionInfo.GetMeta().HasPrimaryKeyBorders() || !portionInfo.GetMeta().HasSnapshotMinMax()) {
- return false;
- }
+ static bool CheckPortion(const TPortionInfo& /*portionInfo*/) {
return true;
}
@@ -114,7 +112,7 @@ public:
auto rowProto = chunk.GetMeta().SerializeToProto();
*rowProto.MutablePortionMeta() = portionInfo->GetMeta().SerializeToProto();
- db.Table<Schema::IndexColumns>().Key(0, portionInfo->GetDeprecatedGranuleId(), chunk.ColumnId,
+ db.Table<Schema::IndexColumns>().Key(0, portionInfo->GetPathId(), chunk.ColumnId,
portionInfo->GetMinSnapshotDeprecated().GetPlanStep(), portionInfo->GetMinSnapshotDeprecated().GetTxId(), portionInfo->GetPortion(), chunk.Chunk).Update(
NIceDb::TUpdate<Schema::IndexColumns::Metadata>(rowProto.SerializeAsString())
);
@@ -146,7 +144,7 @@ TConclusion<std::vector<INormalizerTask::TPtr>> TPortionsNormalizer::Init(const
return tasks;
}
- THashMap<ui64, std::shared_ptr<TPortionInfo>> portions;
+ THashMap<ui64, TPortionInfoConstructor> portions;
auto schemas = std::make_shared<THashMap<ui64, ISnapshotSchema::TPtr>>();
auto pkColumnIds = TMinMaxSnapshotChangesTask::GetColumnsFilter(tablesManager.GetPrimaryIndexSafe().GetVersionedIndex().GetLastSchema());
@@ -157,39 +155,32 @@ TConclusion<std::vector<INormalizerTask::TPtr>> TPortionsNormalizer::Init(const
}
TPortionInfo::TSchemaCursor schema(tablesManager.GetPrimaryIndexSafe().GetVersionedIndex());
- auto initPortionCB = [&](const TPortionInfo& portion, const TColumnChunkLoadContext& loadContext) {
+ auto initPortionCB = [&](TPortionInfoConstructor&& portion, const TColumnChunkLoadContext& loadContext) {
auto currentSchema = schema.GetSchema(portion);
- AFL_VERIFY(portion.ValidSnapshotInfo())("details", portion.DebugString());
if (!pkColumnIds.contains(loadContext.GetAddress().GetColumnId())) {
return;
}
- auto it = portions.find(portion.GetPortion());
- auto portionMeta = loadContext.GetPortionMeta();
+ auto it = portions.find(portion.GetPortionIdVerified());
if (it == portions.end()) {
- Y_ABORT_UNLESS(portion.Records.empty());
- (*schemas)[portion.GetPortionId()] = currentSchema;
- it = portions.emplace(portion.GetPortion(), std::make_shared<TPortionInfo>(portion)).first;
+ (*schemas)[portion.GetPortionIdVerified()] = currentSchema;
+ const ui64 portionId = portion.GetPortionIdVerified();
+ it = portions.emplace(portionId, std::move(portion)).first;
+ } else {
+ it->second.Merge(std::move(portion));
}
- TColumnRecord rec(it->second->RegisterBlobId(loadContext.GetBlobRange().GetBlobId()), loadContext, currentSchema->GetIndexInfo().GetColumnFeaturesVerified(loadContext.GetAddress().GetColumnId()));
- AFL_VERIFY(it->second->IsEqualWithSnapshots(portion))("self", it->second->DebugString())("item", portion.DebugString());
- it->second->AddRecord(currentSchema->GetIndexInfo(), rec, portionMeta);
+ it->second.LoadRecord(currentSchema->GetIndexInfo(), loadContext);
};
while (!rowset.EndOfSet()) {
- TPortionInfo portion = TPortionInfo::BuildEmpty();
- auto index = rowset.GetValue<Schema::IndexColumns::Index>();
- Y_ABORT_UNLESS(index == 0);
-
- portion.SetPathId(rowset.GetValue<Schema::IndexColumns::PathId>());
+ TPortionInfoConstructor portion(rowset.GetValue<Schema::IndexColumns::PathId>(), rowset.GetValue<Schema::IndexColumns::Portion>());
+ Y_ABORT_UNLESS(rowset.GetValue<Schema::IndexColumns::Index>() == 0);
portion.SetMinSnapshotDeprecated(NOlap::TSnapshot(rowset.GetValue<Schema::IndexColumns::PlanStep>(), rowset.GetValue<Schema::IndexColumns::TxId>()));
- portion.SetPortion(rowset.GetValue<Schema::IndexColumns::Portion>());
- portion.SetDeprecatedGranuleId(rowset.GetValue<Schema::IndexColumns::Granule>());
portion.SetRemoveSnapshot(rowset.GetValue<Schema::IndexColumns::XPlanStep>(), rowset.GetValue<Schema::IndexColumns::XTxId>());
NOlap::TColumnChunkLoadContext chunkLoadContext(rowset, &DsGroupSelector);
- initPortionCB(portion, chunkLoadContext);
+ initPortionCB(std::move(portion), chunkLoadContext);
if (!rowset.Next()) {
return TConclusionStatus::Fail("Not ready");
@@ -201,12 +192,13 @@ TConclusion<std::vector<INormalizerTask::TPtr>> TPortionsNormalizer::Init(const
package.reserve(100);
ui64 brokenPortioncCount = 0;
- for (auto&& portion : portions) {
- if (TMinMaxSnapshotChangesTask::CheckPortion(*portion.second)) {
+ for (auto&& portionConstructor : portions) {
+ auto portionInfo = std::make_shared<TPortionInfo>(portionConstructor.second.Build(false));
+ if (TMinMaxSnapshotChangesTask::CheckPortion(*portionInfo)) {
continue;
}
++brokenPortioncCount;
- package.emplace_back(portion.second);
+ package.emplace_back(portionInfo);
if (package.size() == 1000) {
std::vector<std::shared_ptr<TPortionInfo>> local;
local.swap(package);
diff --git a/ydb/core/tx/columnshard/splitter/abstract/chunks.h b/ydb/core/tx/columnshard/splitter/abstract/chunks.h
index e873ff80560..e3be37be2bd 100644
--- a/ydb/core/tx/columnshard/splitter/abstract/chunks.h
+++ b/ydb/core/tx/columnshard/splitter/abstract/chunks.h
@@ -13,6 +13,7 @@ class TSplitterCounters;
namespace NKikimr::NOlap {
class TPortionInfo;
+class TPortionInfoConstructor;
class TSimpleColumnInfo;
class TColumnSaver;
@@ -33,7 +34,7 @@ protected:
virtual std::optional<ui32> DoGetRecordsCount() const = 0;
virtual std::shared_ptr<arrow::Scalar> DoGetFirstScalar() const = 0;
virtual std::shared_ptr<arrow::Scalar> DoGetLastScalar() const = 0;
- virtual void DoAddIntoPortionBeforeBlob(const TBlobRangeLink16& bRange, TPortionInfo& portionInfo) const = 0;
+ virtual void DoAddIntoPortionBeforeBlob(const TBlobRangeLink16& bRange, TPortionInfoConstructor& portionInfo) const = 0;
virtual std::shared_ptr<IPortionDataChunk> DoCopyWithAnotherBlob(TString&& /*data*/, const TSimpleColumnInfo& /*columnInfo*/) const {
AFL_VERIFY(false);
return nullptr;
@@ -116,7 +117,7 @@ public:
}
}
- void AddIntoPortionBeforeBlob(const TBlobRangeLink16& bRange, TPortionInfo& portionInfo) const {
+ void AddIntoPortionBeforeBlob(const TBlobRangeLink16& bRange, TPortionInfoConstructor& portionInfo) const {
AFL_VERIFY(!bRange.IsValid());
return DoAddIntoPortionBeforeBlob(bRange, portionInfo);
}
diff --git a/ydb/core/tx/columnshard/splitter/chunks.cpp b/ydb/core/tx/columnshard/splitter/chunks.cpp
index c00c3f43e61..8ebbd736c12 100644
--- a/ydb/core/tx/columnshard/splitter/chunks.cpp
+++ b/ydb/core/tx/columnshard/splitter/chunks.cpp
@@ -1,6 +1,7 @@
#include "chunks.h"
#include <ydb/core/tx/columnshard/engines/portions/column_record.h>
#include <ydb/core/tx/columnshard/engines/portions/portion_info.h>
+#include <ydb/core/tx/columnshard/engines/portions/constructor.h>
namespace NKikimr::NOlap {
@@ -24,7 +25,7 @@ std::vector<std::shared_ptr<IPortionDataChunk>> IPortionColumnChunk::DoInternalS
return result;
}
-void IPortionColumnChunk::DoAddIntoPortionBeforeBlob(const TBlobRangeLink16& bRange, TPortionInfo& portionInfo) const {
+void IPortionColumnChunk::DoAddIntoPortionBeforeBlob(const TBlobRangeLink16& bRange, TPortionInfoConstructor& portionInfo) const {
AFL_VERIFY(!bRange.IsValid());
TColumnRecord rec(GetChunkAddressVerified(), bRange, BuildSimpleChunkMeta());
portionInfo.AppendOneChunkColumn(std::move(rec));
diff --git a/ydb/core/tx/columnshard/splitter/chunks.h b/ydb/core/tx/columnshard/splitter/chunks.h
index bd7d6c80efd..eae2f4d58fd 100644
--- a/ydb/core/tx/columnshard/splitter/chunks.h
+++ b/ydb/core/tx/columnshard/splitter/chunks.h
@@ -19,7 +19,7 @@ protected:
return DoGetRecordsCountImpl();
}
- virtual void DoAddIntoPortionBeforeBlob(const TBlobRangeLink16& bRange, TPortionInfo& portionInfo) const override;
+ virtual void DoAddIntoPortionBeforeBlob(const TBlobRangeLink16& bRange, TPortionInfoConstructor& portionInfo) const override;
virtual std::vector<std::shared_ptr<IPortionDataChunk>> DoInternalSplitImpl(const TColumnSaver& saver, const std::shared_ptr<NColumnShard::TSplitterCounters>& counters, const std::vector<ui64>& splitSizes) const = 0;
virtual std::vector<std::shared_ptr<IPortionDataChunk>> DoInternalSplit(const TColumnSaver& saver, const std::shared_ptr<NColumnShard::TSplitterCounters>& counters, const std::vector<ui64>& splitSizes) const override;
diff --git a/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp b/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp
index e084c3386f5..65277b9f151 100644
--- a/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp
+++ b/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp
@@ -2513,9 +2513,9 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) {
Y_ABORT_UNLESS(append->AppendedPortions.size());
Cerr << "Added portions:";
for (const auto& portion : append->AppendedPortions) {
+ Y_UNUSED(portion);
++addedPortions;
- ui64 portionId = addedPortions;
- Cerr << " " << portionId << "(" << portion.GetPortionInfo().GetPortion() << ")";
+ Cerr << " " << addedPortions;
}
Cerr << Endl;
}