diff options
| author | ivanmorozov333 <[email protected]> | 2024-04-15 16:12:59 +0300 |
|---|---|---|
| committer | GitHub <[email protected]> | 2024-04-15 16:12:59 +0300 |
| commit | 5a5d4cca3670fcdc555c7fa5744145787ed7030f (patch) | |
| tree | 9b64f997165abfcb49a27d01fc769907c5438f2a | |
| parent | 260fcedc15b1ff31294e9dffccd6a0a679a80aa8 (diff) | |
split portion as object and portion constructor. same with portion meta (#3717)
46 files changed, 925 insertions, 585 deletions
diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto index 2706be6a264..a6385e5c72a 100644 --- a/ydb/core/protos/config.proto +++ b/ydb/core/protos/config.proto @@ -1391,7 +1391,6 @@ message TColumnShardConfig { } optional TTablesStorageLayoutPolicy TablesStorageLayoutPolicy = 1; optional bool DisabledOnSchemeShard = 2 [default = true]; - optional bool SkipOldGranules = 3 [default = true]; optional bool IndexationEnabled = 4 [default = true]; optional bool CompactionEnabled = 5 [default = true]; optional bool TTLEnabled = 6 [default = true]; diff --git a/ydb/core/tx/columnshard/blobs_action/transaction/tx_write_index.cpp b/ydb/core/tx/columnshard/blobs_action/transaction/tx_write_index.cpp index 977adb585ea..750ffefa239 100644 --- a/ydb/core/tx/columnshard/blobs_action/transaction/tx_write_index.cpp +++ b/ydb/core/tx/columnshard/blobs_action/transaction/tx_write_index.cpp @@ -21,7 +21,7 @@ bool TTxWriteIndex::Execute(TTransactionContext& txc, const TActorContext& ctx) TBlobGroupSelector dsGroupSelector(Self->Info()); NOlap::TDbWrapper dbWrap(txc.DB, &dsGroupSelector); - AFL_VERIFY(Self->TablesManager.MutablePrimaryIndex().ApplyChanges(dbWrap, changes, snapshot)); + AFL_VERIFY(Self->TablesManager.MutablePrimaryIndex().ApplyChangesOnExecute(dbWrap, changes, snapshot)); LOG_S_DEBUG(TxPrefix() << "(" << changes->TypeString() << ") apply" << TxSuffix()); NOlap::TWriteIndexContext context(&txc.DB, dbWrap, Self->MutableIndexAs<NOlap::TColumnEngineForLogs>()); changes->WriteIndexOnExecute(Self, context); @@ -35,10 +35,8 @@ bool TTxWriteIndex::Execute(TTransactionContext& txc, const TActorContext& ctx) NOlap::TBlobManagerDb blobsDb(txc.DB); changes->MutableBlobsAction().OnExecuteTxAfterAction(*Self, blobsDb, false); for (ui32 i = 0; i < changes->GetWritePortionsCount(); ++i) { - auto& portion = changes->GetWritePortionInfo(i)->GetPortionInfo(); - for (auto&& i : portion.Records) { - LOG_S_WARN(TxPrefix() << "(" << changes->TypeString() << ":" << portion.RestoreBlobRange(i.BlobRange) << ") blob cannot apply changes: " << TxSuffix()); - } + auto& portion = changes->GetWritePortionInfo(i)->GetPortionResult(); + LOG_S_WARN(TxPrefix() << "(" << changes->TypeString() << ":" << portion.DebugString() << ") blob cannot apply changes: " << TxSuffix()); } NOlap::TChangesFinishContext context("cannot write index blobs"); changes->Abort(*Self, context); @@ -83,6 +81,9 @@ TTxWriteIndex::TTxWriteIndex(TColumnShard* self, TEvPrivate::TEvWriteIndex::TPtr , Ev(ev) , TabletTxNo(++Self->TabletTxCounter) { + NOlap::TSnapshot snapshot(Self->LastPlannedStep, Self->LastPlannedTxId); + auto changes = Ev->Get()->IndexChanges; + AFL_VERIFY(Self->TablesManager.MutablePrimaryIndex().ApplyChangesOnTxCreate(changes, snapshot)); Y_ABORT_UNLESS(Ev && Ev->Get()->IndexChanges); } diff --git a/ydb/core/tx/columnshard/engines/changes/abstract/abstract.cpp b/ydb/core/tx/columnshard/engines/changes/abstract/abstract.cpp index 295f182216c..c3496e54aee 100644 --- a/ydb/core/tx/columnshard/engines/changes/abstract/abstract.cpp +++ b/ydb/core/tx/columnshard/engines/changes/abstract/abstract.cpp @@ -53,12 +53,10 @@ void TColumnEngineChanges::WriteIndexOnComplete(NColumnShard::TColumnShard* self void TColumnEngineChanges::Compile(TFinalizationContext& context) noexcept { Y_ABORT_UNLESS(Stage != EStage::Aborted); - if ((ui32)Stage >= (ui32)EStage::Compiled) { - return; - } Y_ABORT_UNLESS(Stage == EStage::Constructed); DoCompile(context); + DoOnAfterCompile(); Stage = EStage::Compiled; } diff --git a/ydb/core/tx/columnshard/engines/changes/abstract/abstract.h b/ydb/core/tx/columnshard/engines/changes/abstract/abstract.h index 339face80d3..d65cdc755f4 100644 --- a/ydb/core/tx/columnshard/engines/changes/abstract/abstract.h +++ b/ydb/core/tx/columnshard/engines/changes/abstract/abstract.h @@ -201,6 +201,7 @@ private: protected: virtual void DoDebugString(TStringOutput& out) const = 0; virtual void DoCompile(TFinalizationContext& context) = 0; + virtual void DoOnAfterCompile() {} virtual void DoWriteIndexOnExecute(NColumnShard::TColumnShard* self, TWriteIndexContext& context) = 0; virtual void DoWriteIndexOnComplete(NColumnShard::TColumnShard* self, TWriteIndexCompleteContext& context) = 0; virtual void DoOnFinish(NColumnShard::TColumnShard& self, TChangesFinishContext& context) = 0; diff --git a/ydb/core/tx/columnshard/engines/changes/actualization/construction/context.cpp b/ydb/core/tx/columnshard/engines/changes/actualization/construction/context.cpp index 6b1ea617f21..919abdfa967 100644 --- a/ydb/core/tx/columnshard/engines/changes/actualization/construction/context.cpp +++ b/ydb/core/tx/columnshard/engines/changes/actualization/construction/context.cpp @@ -56,7 +56,7 @@ bool TTieringProcessContext::AddPortion(const TPortionInfo& info, TPortionEvicti if (features.GetTargetTierName() == NTiering::NCommon::DeleteTierName) { AFL_VERIFY(dWait); Counters.OnPortionToDrop(info.GetTotalBlobBytes(), *dWait); - it->second.back().GetTask()->PortionsToRemove.emplace(info.GetAddress(), info); + it->second.back().GetTask()->AddPortionToRemove(info); AFL_VERIFY(!it->second.back().GetTask()->GetPortionsToEvictCount())("rw", features.GetRWAddress().DebugString())("f", it->first.DebugString()); } else { if (!dWait) { @@ -65,7 +65,7 @@ bool TTieringProcessContext::AddPortion(const TPortionInfo& info, TPortionEvicti Counters.OnPortionToEvict(info.GetTotalBlobBytes(), *dWait); } it->second.back().GetTask()->AddPortionToEvict(info, std::move(features)); - AFL_VERIFY(it->second.back().GetTask()->PortionsToRemove.empty())("rw", features.GetRWAddress().DebugString())("f", it->first.DebugString()); + AFL_VERIFY(!it->second.back().GetTask()->HasPortionsToRemove())("rw", features.GetRWAddress().DebugString())("f", it->first.DebugString()); } return true; } diff --git a/ydb/core/tx/columnshard/engines/changes/compaction.cpp b/ydb/core/tx/columnshard/engines/changes/compaction.cpp index 24e1e8c5c0e..a94d160158e 100644 --- a/ydb/core/tx/columnshard/engines/changes/compaction.cpp +++ b/ydb/core/tx/columnshard/engines/changes/compaction.cpp @@ -23,7 +23,7 @@ void TCompactColumnEngineChanges::DoCompile(TFinalizationContext& context) { const TPortionMeta::EProduced producedClassResultCompaction = GetResultProducedClass(); for (auto& portionInfo : AppendedPortions) { - portionInfo.GetPortionInfo().UpdateRecordsMeta(producedClassResultCompaction); + portionInfo.GetPortionConstructor().MutableMeta().UpdateRecordsMeta(producedClassResultCompaction); } } @@ -77,7 +77,7 @@ TCompactColumnEngineChanges::TCompactColumnEngineChanges(std::shared_ptr<TGranul for (const auto& portionInfo : portions) { Y_ABORT_UNLESS(!portionInfo->HasRemoveSnapshot()); SwitchedPortions.emplace_back(*portionInfo); - AFL_VERIFY(PortionsToRemove.emplace(portionInfo->GetAddress(), *portionInfo).second); + AddPortionToRemove(*portionInfo); Y_ABORT_UNLESS(portionInfo->GetPathId() == GranuleMeta->GetPathId()); } Y_ABORT_UNLESS(SwitchedPortions.size()); diff --git a/ydb/core/tx/columnshard/engines/changes/general_compaction.cpp b/ydb/core/tx/columnshard/engines/changes/general_compaction.cpp index 706a0924c5c..3f5e692df4a 100644 --- a/ydb/core/tx/columnshard/engines/changes/general_compaction.cpp +++ b/ydb/core/tx/columnshard/engines/changes/general_compaction.cpp @@ -200,7 +200,8 @@ void TGeneralCompactColumnEngineChanges::BuildAppendedPortionsByChunks(TConstruc AppendedPortions.back().FillStatistics(resultSchema->GetIndexInfo()); NArrow::TFirstLastSpecialKeys primaryKeys(slice.GetFirstLastPKBatch(resultSchema->GetIndexInfo().GetReplaceKey())); NArrow::TMinMaxSpecialKeys snapshotKeys(b, TIndexInfo::ArrowSchemaSnapshot()); - AppendedPortions.back().GetPortionInfo().AddMetadata(*resultSchema, primaryKeys, snapshotKeys, IStoragesManager::DefaultStorageId); + AppendedPortions.back().GetPortionConstructor().AddMetadata(*resultSchema, primaryKeys, snapshotKeys); + AppendedPortions.back().GetPortionConstructor().MutableMeta().SetTierName(IStoragesManager::DefaultStorageId); recordIdx += slice.GetRecordsCount(); } } @@ -245,7 +246,7 @@ TConclusionStatus TGeneralCompactColumnEngineChanges::DoConstructBlobs(TConstruc TStringBuilder sbAppended; for (auto&& p : AppendedPortions) { - sbAppended << p.GetPortionInfo().DebugString() << ";"; + sbAppended << p.GetPortionConstructor().DebugString() << ";"; } AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "blobs_created_diff")("appended", sbAppended)("switched", sbSwitched); } diff --git a/ydb/core/tx/columnshard/engines/changes/ttl.cpp b/ydb/core/tx/columnshard/engines/changes/ttl.cpp index c2107a3515d..f23c799e331 100644 --- a/ydb/core/tx/columnshard/engines/changes/ttl.cpp +++ b/ydb/core/tx/columnshard/engines/changes/ttl.cpp @@ -14,7 +14,7 @@ void TTTLColumnEngineChanges::DoDebugString(TStringOutput& out) const { } void TTTLColumnEngineChanges::DoStart(NColumnShard::TColumnShard& self) { - Y_ABORT_UNLESS(PortionsToEvict.size() || PortionsToRemove.size()); + Y_ABORT_UNLESS(PortionsToEvict.size() || HasPortionsToRemove()); THashMap<TString, THashSet<TBlobRange>> blobRanges; auto& engine = self.MutableIndexAs<TColumnEngineForLogs>(); auto& index = engine.GetVersionedIndex(); @@ -39,7 +39,7 @@ void TTTLColumnEngineChanges::DoOnFinish(NColumnShard::TColumnShard& self, TChan for (auto&& i : PortionsToEvict) { AFL_VERIFY(restoreIndexAddresses[i.GetPortionInfo().GetPathId()].emplace(i.GetPortionInfo().GetPortionId()).second); } - for (auto&& i : PortionsToRemove) { + for (auto&& i : GetPortionsToRemove()) { AFL_VERIFY(restoreIndexAddresses[i.first.GetPathId()].emplace(i.first.GetPortionId()).second); } engine.ReturnToIndexes(restoreIndexAddresses); @@ -66,8 +66,7 @@ NKikimr::TConclusionStatus TTTLColumnEngineChanges::DoConstructBlobs(TConstructi for (auto&& info : PortionsToEvict) { if (auto pwb = UpdateEvictedPortion(info, Blobs, context)) { - info.MutablePortionInfo().SetRemoveSnapshot(context.LastCommittedTx); - AFL_VERIFY(PortionsToRemove.emplace(info.GetPortionInfo().GetAddress(), info.GetPortionInfo()).second); + AddPortionToRemove(info.GetPortionInfo()); AppendedPortions.emplace_back(std::move(*pwb)); } } diff --git a/ydb/core/tx/columnshard/engines/changes/ttl.h b/ydb/core/tx/columnshard/engines/changes/ttl.h index 27d76b750dc..92eb0ffa9b3 100644 --- a/ydb/core/tx/columnshard/engines/changes/ttl.h +++ b/ydb/core/tx/columnshard/engines/changes/ttl.h @@ -94,7 +94,6 @@ public: ui32 GetPortionsToEvictCount() const { return PortionsToEvict.size(); } - void AddPortionToEvict(const TPortionInfo& info, TPortionEvictionFeatures&& features) { Y_ABORT_UNLESS(!info.Empty()); Y_ABORT_UNLESS(!info.HasRemoveSnapshot()); diff --git a/ydb/core/tx/columnshard/engines/changes/with_appended.cpp b/ydb/core/tx/columnshard/engines/changes/with_appended.cpp index ba99045e723..4b759675c17 100644 --- a/ydb/core/tx/columnshard/engines/changes/with_appended.cpp +++ b/ydb/core/tx/columnshard/engines/changes/with_appended.cpp @@ -17,7 +17,7 @@ void TChangesWithAppend::DoWriteIndexOnExecute(NColumnShard::TColumnShard* self, portionInfo.SaveToDatabase(context.DBWrapper, schemaPtr->GetIndexInfo().GetPKFirstColumnId(), true); } const auto predRemoveDroppedTable = [self](const TWritePortionInfoWithBlobs& item) { - auto& portionInfo = item.GetPortionInfo(); + auto& portionInfo = item.GetPortionResult(); if (!!self && (!self->TablesManager.HasTable(portionInfo.GetPathId()) || self->TablesManager.GetTable(portionInfo.GetPathId()).IsDropped())) { AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "skip_inserted_data")("reason", "table_removed")("path_id", portionInfo.GetPathId()); return true; @@ -27,8 +27,7 @@ void TChangesWithAppend::DoWriteIndexOnExecute(NColumnShard::TColumnShard* self, }; AppendedPortions.erase(std::remove_if(AppendedPortions.begin(), AppendedPortions.end(), predRemoveDroppedTable), AppendedPortions.end()); for (auto& portionInfoWithBlobs : AppendedPortions) { - auto& portionInfo = portionInfoWithBlobs.GetPortionInfo(); - Y_ABORT_UNLESS(!portionInfo.Empty()); + auto& portionInfo = portionInfoWithBlobs.GetPortionResult(); AFL_VERIFY(usedPortionIds.emplace(portionInfo.GetPortionId()).second)("portion_info", portionInfo.DebugString(true)); portionInfo.SaveToDatabase(context.DBWrapper, schemaPtr->GetIndexInfo().GetPKFirstColumnId(), false); } @@ -36,8 +35,9 @@ void TChangesWithAppend::DoWriteIndexOnExecute(NColumnShard::TColumnShard* self, void TChangesWithAppend::DoWriteIndexOnComplete(NColumnShard::TColumnShard* self, TWriteIndexCompleteContext& context) { if (self) { - for (auto& portionInfo : AppendedPortions) { - switch (portionInfo.GetPortionInfo().GetMeta().Produced) { + for (auto& portionBuilder : AppendedPortions) { + auto& portionInfo = portionBuilder.GetPortionResult(); + switch (portionInfo.GetMeta().Produced) { case NOlap::TPortionMeta::EProduced::UNSPECIFIED: Y_ABORT_UNLESS(false); // unexpected case NOlap::TPortionMeta::EProduced::INSERTED: @@ -79,22 +79,25 @@ void TChangesWithAppend::DoWriteIndexOnComplete(NColumnShard::TColumnShard* self const TPortionInfo& oldInfo = context.EngineLogs.GetGranuleVerified(portionInfo.GetPathId()).GetPortionVerified(portionInfo.GetPortion()); context.EngineLogs.UpsertPortion(portionInfo, &oldInfo); } - for (auto& portionInfoWithBlobs : AppendedPortions) { - auto& portionInfo = portionInfoWithBlobs.GetPortionInfo(); - context.EngineLogs.UpsertPortion(portionInfo); + for (auto& portionBuilder : AppendedPortions) { + context.EngineLogs.UpsertPortion(portionBuilder.GetPortionResult()); } } } void TChangesWithAppend::DoCompile(TFinalizationContext& context) { for (auto&& i : AppendedPortions) { - i.GetPortionInfo().SetPortion(context.NextPortionId()); - i.GetPortionInfo().UpdateRecordsMeta(TPortionMeta::EProduced::INSERTED); + i.GetPortionConstructor().SetPortionId(context.NextPortionId()); + i.GetPortionConstructor().MutableMeta().UpdateRecordsMeta(TPortionMeta::EProduced::INSERTED); } for (auto& [_, portionInfo] : PortionsToRemove) { - if (!portionInfo.HasRemoveSnapshot()) { - portionInfo.SetRemoveSnapshot(context.GetSnapshot()); - } + portionInfo.SetRemoveSnapshot(context.GetSnapshot()); + } +} + +void TChangesWithAppend::DoOnAfterCompile() { + for (auto&& i : AppendedPortions) { + i.FinalizePortionConstructor(); } } @@ -134,7 +137,8 @@ std::vector<TWritePortionInfoWithBlobs> TChangesWithAppend::MakeAppendedPortions out.back().FillStatistics(resultSchema->GetIndexInfo()); NArrow::TFirstLastSpecialKeys primaryKeys(slice.GetFirstLastPKBatch(resultSchema->GetIndexInfo().GetReplaceKey())); NArrow::TMinMaxSpecialKeys snapshotKeys(b, TIndexInfo::ArrowSchemaSnapshot()); - out.back().GetPortionInfo().AddMetadata(*resultSchema, primaryKeys, snapshotKeys, IStoragesManager::DefaultStorageId); + out.back().GetPortionConstructor().AddMetadata(*resultSchema, b); + out.back().GetPortionConstructor().MutableMeta().SetTierName(IStoragesManager::DefaultStorageId); recordIdx += slice.GetRecordsCount(); } } diff --git a/ydb/core/tx/columnshard/engines/changes/with_appended.h b/ydb/core/tx/columnshard/engines/changes/with_appended.h index 1a99a11135f..4c5fbc2189c 100644 --- a/ydb/core/tx/columnshard/engines/changes/with_appended.h +++ b/ydb/core/tx/columnshard/engines/changes/with_appended.h @@ -9,10 +9,11 @@ namespace NKikimr::NOlap { class TChangesWithAppend: public TColumnEngineChanges { private: using TBase = TColumnEngineChanges; - + THashMap<TPortionAddress, TPortionInfo> PortionsToRemove; protected: TSaverContext SaverContext; virtual void DoCompile(TFinalizationContext& context) override; + virtual void DoOnAfterCompile() override; virtual void DoWriteIndexOnExecute(NColumnShard::TColumnShard* self, TWriteIndexContext& context) override; virtual void DoWriteIndexOnComplete(NColumnShard::TColumnShard* self, TWriteIndexCompleteContext& context) override; virtual void DoStart(NColumnShard::TColumnShard& self) override; @@ -35,7 +36,6 @@ protected: return selfLock; } } - public: TChangesWithAppend(const TSaverContext& saverContext, const NBlobOperations::EConsumer consumerId) : TBase(saverContext.GetStoragesManager(), consumerId) @@ -44,7 +44,23 @@ public: } - THashMap<TPortionAddress, TPortionInfo> PortionsToRemove; + const THashMap<TPortionAddress, TPortionInfo>& GetPortionsToRemove() const { + return PortionsToRemove; + } + + ui32 GetPortionsToRemoveSize() const { + return PortionsToRemove.size(); + } + + bool HasPortionsToRemove() const { + return PortionsToRemove.size(); + } + + void AddPortionToRemove(const TPortionInfo& info) { + AFL_VERIFY(!info.HasRemoveSnapshot()); + AFL_VERIFY(PortionsToRemove.emplace(info.GetAddress(), info).second); + } + std::vector<TWritePortionInfoWithBlobs> AppendedPortions; virtual ui32 GetWritePortionsCount() const override { return AppendedPortions.size(); diff --git a/ydb/core/tx/columnshard/engines/column_engine.cpp b/ydb/core/tx/columnshard/engines/column_engine.cpp index d6f46742093..0b1979f537b 100644 --- a/ydb/core/tx/columnshard/engines/column_engine.cpp +++ b/ydb/core/tx/columnshard/engines/column_engine.cpp @@ -1,4 +1,5 @@ #include "column_engine.h" +#include "portions/portion_info.h" #include <ydb/core/base/appdata.h> #include <util/system/info.h> @@ -21,4 +22,37 @@ ui64 IColumnEngine::GetMetadataLimit() { } } +size_t TSelectInfo::NumChunks() const { + size_t records = 0; + for (auto& portionInfo : PortionsOrderedPK) { + records += portionInfo->NumChunks(); + } + return records; +} + +TSelectInfo::TStats TSelectInfo::Stats() const { + TStats out; + out.Portions = PortionsOrderedPK.size(); + + THashSet<TUnifiedBlobId> uniqBlob; + for (auto& portionInfo : PortionsOrderedPK) { + out.Records += portionInfo->NumChunks(); + out.Rows += portionInfo->NumRows(); + for (auto& rec : portionInfo->Records) { + out.Bytes += rec.BlobRange.Size; + } + out.Blobs += portionInfo->GetBlobIdsCount(); + } + return out; +} + +void TSelectInfo::DebugStream(IOutputStream& out) { + if (PortionsOrderedPK.size()) { + out << "portions:"; + for (auto& portionInfo : PortionsOrderedPK) { + out << portionInfo->DebugString(); + } + } +} + } diff --git a/ydb/core/tx/columnshard/engines/column_engine.h b/ydb/core/tx/columnshard/engines/column_engine.h index aba511eec9b..58a2d8eb310 100644 --- a/ydb/core/tx/columnshard/engines/column_engine.h +++ b/ydb/core/tx/columnshard/engines/column_engine.h @@ -1,9 +1,9 @@ #pragma once #include "db_wrapper.h" -#include "scheme/snapshot_scheme.h" -#include "predicate/filter.h" #include "changes/abstract/settings.h" #include "changes/abstract/compaction_info.h" +#include "predicate/filter.h" +#include "scheme/snapshot_scheme.h" #include "scheme/versions/versioned_index.h" #include <ydb/core/tx/columnshard/common/reverse_accessor.h> @@ -49,39 +49,11 @@ struct TSelectInfo { return NColumnShard::TContainerAccessorWithDirection<std::vector<std::shared_ptr<TPortionInfo>>>(PortionsOrderedPK, reverse); } - size_t NumChunks() const { - size_t records = 0; - for (auto& portionInfo : PortionsOrderedPK) { - records += portionInfo->NumChunks(); - } - return records; - } + size_t NumChunks() const; - TStats Stats() const { - TStats out; - out.Portions = PortionsOrderedPK.size(); + TStats Stats() const; - THashSet<TUnifiedBlobId> uniqBlob; - for (auto& portionInfo : PortionsOrderedPK) { - out.Records += portionInfo->NumChunks(); - out.Rows += portionInfo->NumRows(); - for (auto& rec : portionInfo->Records) { - out.Bytes += rec.BlobRange.Size; - } - out.Blobs += portionInfo->GetBlobIdsCount(); - } - return out; - } - - friend IOutputStream& operator << (IOutputStream& out, const TSelectInfo& info) { - if (info.PortionsOrderedPK.size()) { - out << "portions:"; - for (auto& portionInfo : info.PortionsOrderedPK) { - out << portionInfo->DebugString(); - } - } - return out; - } + void DebugStream(IOutputStream& out); }; class TColumnEngineStats { @@ -317,7 +289,8 @@ public: virtual std::shared_ptr<TCleanupTablesColumnEngineChanges> StartCleanupTables(THashSet<ui64>& pathsToDrop) noexcept = 0; virtual std::vector<std::shared_ptr<TTTLColumnEngineChanges>> StartTtl(const THashMap<ui64, TTiering>& pathEviction, const std::shared_ptr<NDataLocks::TManager>& dataLocksManager, const ui64 memoryUsageLimit) noexcept = 0; - virtual bool ApplyChanges(IDbWrapper& db, std::shared_ptr<TColumnEngineChanges> changes, const TSnapshot& snapshot) noexcept = 0; + virtual bool ApplyChangesOnTxCreate(std::shared_ptr<TColumnEngineChanges> changes, const TSnapshot& snapshot) noexcept = 0; + virtual bool ApplyChangesOnExecute(IDbWrapper& db, std::shared_ptr<TColumnEngineChanges> changes, const TSnapshot& snapshot) noexcept = 0; virtual void RegisterSchemaVersion(const TSnapshot& snapshot, TIndexInfo&& info) = 0; virtual void RegisterSchemaVersion(const TSnapshot& snapshot, const NKikimrSchemeOp::TColumnTableSchema& schema) = 0; virtual const TMap<ui64, std::shared_ptr<TColumnEngineStats>>& GetStats() const = 0; diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp index 2be9f777137..49e039aa1df 100644 --- a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp +++ b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp @@ -7,6 +7,7 @@ #include "changes/cleanup_portions.h" #include "changes/cleanup_tables.h" #include "changes/ttl.h" +#include "portions/constructor.h" #include <ydb/core/tx/columnshard/common/limits.h> #include <ydb/core/tx/columnshard/hooks/abstract/abstract.h> @@ -185,11 +186,12 @@ bool TColumnEngineForLogs::Load(IDbWrapper& db) { } bool TColumnEngineForLogs::LoadColumns(IDbWrapper& db) { + TPortionConstructors constructors; { - if (!db.LoadPortions([&](TPortionInfo&& portion, const NKikimrTxColumnShard::TIndexPortionMeta& metaProto) { + if (!db.LoadPortions([&](TPortionInfoConstructor&& portion, const NKikimrTxColumnShard::TIndexPortionMeta& metaProto) { const TIndexInfo& indexInfo = portion.GetSchema(VersionedIndex)->GetIndexInfo(); - portion.LoadMetadata(metaProto, indexInfo); - GetGranulePtrVerified(portion.GetPathId())->UpsertPortionOnLoad(std::move(portion)); + AFL_VERIFY(portion.MutableMeta().LoadMetadata(metaProto, indexInfo)); + AFL_VERIFY(constructors.AddConstructorVerified(std::move(portion))); })) { return false; } @@ -197,25 +199,28 @@ bool TColumnEngineForLogs::LoadColumns(IDbWrapper& db) { { TPortionInfo::TSchemaCursor schema(VersionedIndex); - if (!db.LoadColumns([&](const TPortionInfo& portion, const TColumnChunkLoadContext& loadContext) { + if (!db.LoadColumns([&](TPortionInfoConstructor&& portion, const TColumnChunkLoadContext& loadContext) { + auto* constructor = constructors.MergeConstructor(std::move(portion)); auto currentSchema = schema.GetSchema(portion); - AFL_VERIFY(portion.ValidSnapshotInfo())("details", portion.DebugString()); - // Locate granule and append the record. - GetGranulePtrVerified(portion.GetPathId())->AddColumnRecordOnLoad(currentSchema->GetIndexInfo(), portion, loadContext, loadContext.GetPortionMeta()); + constructor->LoadRecord(currentSchema->GetIndexInfo(), loadContext); })) { return false; } } - - if (!db.LoadIndexes([&](const ui64 pathId, const ui64 portionId, const TIndexChunkLoadContext& loadContext) { - auto portion = GetGranulePtrVerified(pathId)->GetPortionOptional(portionId); - AFL_VERIFY(portion); - const auto linkBlobId = portion->RegisterBlobId(loadContext.GetBlobRange().GetBlobId()); - portion->AddIndex(loadContext.BuildIndexChunk(linkBlobId)); - })) { - return false; - }; - + { + if (!db.LoadIndexes([&](const ui64 pathId, const ui64 portionId, const TIndexChunkLoadContext& loadContext) { + auto* constructor = constructors.GetConstructorVerified(pathId, portionId); + constructor->LoadIndex(loadContext); + })) { + return false; + }; + } + for (auto&& [granuleId, pathConstructors] : constructors) { + auto g = GetGranulePtrVerified(granuleId); + for (auto&& [portionId, constructor] : pathConstructors) { + g->UpsertPortionOnLoad(constructor.Build(false)); + } + } for (auto&& i : GranulesStorage->GetTables()) { i.second->OnAfterPortionsLoad(); } @@ -413,18 +418,20 @@ std::vector<std::shared_ptr<TTTLColumnEngineChanges>> TColumnEngineForLogs::Star std::vector<std::shared_ptr<TTTLColumnEngineChanges>> result; for (auto&& i : context.GetTasks()) { for (auto&& t : i.second) { - SignalCounters.OnActualizationTask(t.GetTask()->GetPortionsToEvictCount(), t.GetTask()->PortionsToRemove.size()); + SignalCounters.OnActualizationTask(t.GetTask()->GetPortionsToEvictCount(), t.GetTask()->GetPortionsToRemoveSize()); result.emplace_back(t.GetTask()); } } return result; } -bool TColumnEngineForLogs::ApplyChanges(IDbWrapper& db, std::shared_ptr<TColumnEngineChanges> indexChanges, const TSnapshot& snapshot) noexcept { - { - TFinalizationContext context(LastGranule, LastPortion, snapshot); - indexChanges->Compile(context); - } +bool TColumnEngineForLogs::ApplyChangesOnTxCreate(std::shared_ptr<TColumnEngineChanges> indexChanges, const TSnapshot& snapshot) noexcept { + TFinalizationContext context(LastGranule, LastPortion, snapshot); + indexChanges->Compile(context); + return true; +} + +bool TColumnEngineForLogs::ApplyChangesOnExecute(IDbWrapper& db, std::shared_ptr<TColumnEngineChanges> /*indexChanges*/, const TSnapshot& snapshot) noexcept { db.WriteCounter(LAST_PORTION, LastPortion); db.WriteCounter(LAST_GRANULE, LastGranule); diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.h b/ydb/core/tx/columnshard/engines/column_engine_logs.h index d991a6878f7..017ef3cbe84 100644 --- a/ydb/core/tx/columnshard/engines/column_engine_logs.h +++ b/ydb/core/tx/columnshard/engines/column_engine_logs.h @@ -114,8 +114,8 @@ public: void ReturnToIndexes(const THashMap<ui64, THashSet<ui64>>& portions) const { return GranulesStorage->ReturnToIndexes(portions); } - bool ApplyChanges(IDbWrapper& db, std::shared_ptr<TColumnEngineChanges> indexChanges, - const TSnapshot& snapshot) noexcept override; + virtual bool ApplyChangesOnTxCreate(std::shared_ptr<TColumnEngineChanges> indexChanges, const TSnapshot& snapshot) noexcept override; + virtual bool ApplyChangesOnExecute(IDbWrapper& db, std::shared_ptr<TColumnEngineChanges> indexChanges, const TSnapshot& snapshot) noexcept override; void RegisterSchemaVersion(const TSnapshot& snapshot, TIndexInfo&& info) override; void RegisterSchemaVersion(const TSnapshot& snapshot, const NKikimrSchemeOp::TColumnTableSchema& schema) override; diff --git a/ydb/core/tx/columnshard/engines/db_wrapper.cpp b/ydb/core/tx/columnshard/engines/db_wrapper.cpp index 535c79fd9e2..4bc8f9bf093 100644 --- a/ydb/core/tx/columnshard/engines/db_wrapper.cpp +++ b/ydb/core/tx/columnshard/engines/db_wrapper.cpp @@ -1,5 +1,6 @@ #include "defs.h" #include "db_wrapper.h" +#include "portions/constructor.h" #include <ydb/core/tx/columnshard/columnshard_schema.h> namespace NKikimr::NOlap { @@ -48,7 +49,7 @@ void TDbWrapper::WriteColumn(const NOlap::TPortionInfo& portion, const TColumnRe } using IndexColumns = NColumnShard::Schema::IndexColumns; auto removeSnapshot = portion.GetRemoveSnapshotOptional(); - db.Table<IndexColumns>().Key(0, portion.GetDeprecatedGranuleId(), row.ColumnId, + db.Table<IndexColumns>().Key(0, portion.GetPathId(), row.ColumnId, portion.GetMinSnapshotDeprecated().GetPlanStep(), portion.GetMinSnapshotDeprecated().GetTxId(), portion.GetPortion(), row.Chunk).Update( NIceDb::TUpdate<IndexColumns::XPlanStep>(removeSnapshot ? removeSnapshot->GetPlanStep() : 0), NIceDb::TUpdate<IndexColumns::XTxId>(removeSnapshot ? removeSnapshot->GetTxId() : 0), @@ -81,11 +82,11 @@ void TDbWrapper::ErasePortion(const NOlap::TPortionInfo& portion) { void TDbWrapper::EraseColumn(const NOlap::TPortionInfo& portion, const TColumnRecord& row) { NIceDb::TNiceDb db(Database); using IndexColumns = NColumnShard::Schema::IndexColumns; - db.Table<IndexColumns>().Key(0, portion.GetDeprecatedGranuleId(), row.ColumnId, + db.Table<IndexColumns>().Key(0, portion.GetPathId(), row.ColumnId, portion.GetMinSnapshotDeprecated().GetPlanStep(), portion.GetMinSnapshotDeprecated().GetTxId(), portion.GetPortion(), row.Chunk).Delete(); } -bool TDbWrapper::LoadColumns(const std::function<void(const NOlap::TPortionInfo&, const TColumnChunkLoadContext&)>& callback) { +bool TDbWrapper::LoadColumns(const std::function<void(NOlap::TPortionInfoConstructor&&, const TColumnChunkLoadContext&)>& callback) { NIceDb::TNiceDb db(Database); using IndexColumns = NColumnShard::Schema::IndexColumns; auto rowset = db.Table<IndexColumns>().Prefix(0).Select(); @@ -94,17 +95,15 @@ bool TDbWrapper::LoadColumns(const std::function<void(const NOlap::TPortionInfo& } while (!rowset.EndOfSet()) { - NOlap::TPortionInfo portion = NOlap::TPortionInfo::BuildEmpty(); - portion.SetPathId(rowset.GetValue<IndexColumns::PathId>()); - portion.SetPortion(rowset.GetValue<IndexColumns::Portion>()); - portion.SetMinSnapshotDeprecated(NOlap::TSnapshot(rowset.GetValue<IndexColumns::PlanStep>(), rowset.GetValue<IndexColumns::TxId>())); - portion.SetDeprecatedGranuleId(rowset.GetValue<IndexColumns::Granule>()); + NOlap::TSnapshot minSnapshot(rowset.GetValue<IndexColumns::PlanStep>(), rowset.GetValue<IndexColumns::TxId>()); + NOlap::TSnapshot removeSnapshot(rowset.GetValue<IndexColumns::XPlanStep>(), rowset.GetValue<IndexColumns::XTxId>()); - NOlap::TColumnChunkLoadContext chunkLoadContext(rowset, DsGroupSelector); - - portion.SetRemoveSnapshot(rowset.GetValue<IndexColumns::XPlanStep>(), rowset.GetValue<IndexColumns::XTxId>()); + NOlap::TPortionInfoConstructor constructor(rowset.GetValue<IndexColumns::PathId>(), rowset.GetValue<IndexColumns::Portion>()); + constructor.SetMinSnapshotDeprecated(minSnapshot); + constructor.SetRemoveSnapshot(removeSnapshot); - callback(portion, chunkLoadContext); + NOlap::TColumnChunkLoadContext chunkLoadContext(rowset, DsGroupSelector); + callback(std::move(constructor), chunkLoadContext); if (!rowset.Next()) { return false; @@ -113,7 +112,7 @@ bool TDbWrapper::LoadColumns(const std::function<void(const NOlap::TPortionInfo& return true; } -bool TDbWrapper::LoadPortions(const std::function<void(NOlap::TPortionInfo&&, const NKikimrTxColumnShard::TIndexPortionMeta&)>& callback) { +bool TDbWrapper::LoadPortions(const std::function<void(NOlap::TPortionInfoConstructor&&, const NKikimrTxColumnShard::TIndexPortionMeta&)>& callback) { NIceDb::TNiceDb db(Database); using IndexPortions = NColumnShard::Schema::IndexPortions; auto rowset = db.Table<IndexPortions>().Select(); @@ -122,9 +121,7 @@ bool TDbWrapper::LoadPortions(const std::function<void(NOlap::TPortionInfo&&, co } while (!rowset.EndOfSet()) { - NOlap::TPortionInfo portion = NOlap::TPortionInfo::BuildEmpty(); - portion.SetPathId(rowset.GetValue<IndexPortions::PathId>()); - portion.SetPortion(rowset.GetValue<IndexPortions::PortionId>()); + NOlap::TPortionInfoConstructor portion(rowset.GetValue<IndexPortions::PathId>(), rowset.GetValue<IndexPortions::PortionId>()); portion.SetSchemaVersion(rowset.GetValue<IndexPortions::SchemaVersion>()); portion.SetRemoveSnapshot(rowset.GetValue<IndexPortions::XPlanStep>(), rowset.GetValue<IndexPortions::XTxId>()); diff --git a/ydb/core/tx/columnshard/engines/db_wrapper.h b/ydb/core/tx/columnshard/engines/db_wrapper.h index 1f4990da1cc..d9435913bd1 100644 --- a/ydb/core/tx/columnshard/engines/db_wrapper.h +++ b/ydb/core/tx/columnshard/engines/db_wrapper.h @@ -21,6 +21,7 @@ class TIndexChunk; struct TGranuleRecord; class IColumnEngine; class TPortionInfo; +class TPortionInfoConstructor; class IDbWrapper { public: @@ -38,11 +39,11 @@ public: virtual void WriteColumn(const TPortionInfo& portion, const TColumnRecord& row, const ui32 firstPKColumnId) = 0; virtual void EraseColumn(const TPortionInfo& portion, const TColumnRecord& row) = 0; - virtual bool LoadColumns(const std::function<void(const TPortionInfo&, const TColumnChunkLoadContext&)>& callback) = 0; + virtual bool LoadColumns(const std::function<void(NOlap::TPortionInfoConstructor&&, const TColumnChunkLoadContext&)>& callback) = 0; virtual void WritePortion(const NOlap::TPortionInfo& portion) = 0; virtual void ErasePortion(const NOlap::TPortionInfo& portion) = 0; - virtual bool LoadPortions(const std::function<void(NOlap::TPortionInfo&&, const NKikimrTxColumnShard::TIndexPortionMeta&)>& callback) = 0; + virtual bool LoadPortions(const std::function<void(NOlap::TPortionInfoConstructor&&, const NKikimrTxColumnShard::TIndexPortionMeta&)>& callback) = 0; virtual void WriteIndex(const TPortionInfo& portion, const TIndexChunk& row) = 0; virtual void EraseIndex(const TPortionInfo& portion, const TIndexChunk& row) = 0; @@ -70,11 +71,11 @@ public: void WritePortion(const NOlap::TPortionInfo& portion) override; void ErasePortion(const NOlap::TPortionInfo& portion) override; - bool LoadPortions(const std::function<void(NOlap::TPortionInfo&&, const NKikimrTxColumnShard::TIndexPortionMeta&)>& callback) override; + bool LoadPortions(const std::function<void(NOlap::TPortionInfoConstructor&&, const NKikimrTxColumnShard::TIndexPortionMeta&)>& callback) override; void WriteColumn(const NOlap::TPortionInfo& portion, const TColumnRecord& row, const ui32 firstPKColumnId) override; void EraseColumn(const NOlap::TPortionInfo& portion, const TColumnRecord& row) override; - bool LoadColumns(const std::function<void(const NOlap::TPortionInfo&, const TColumnChunkLoadContext&)>& callback) override; + bool LoadColumns(const std::function<void(NOlap::TPortionInfoConstructor&&, const TColumnChunkLoadContext&)>& callback) override; virtual void WriteIndex(const TPortionInfo& portion, const TIndexChunk& row) override; virtual void EraseIndex(const TPortionInfo& portion, const TIndexChunk& row) override; diff --git a/ydb/core/tx/columnshard/engines/portions/column_record.h b/ydb/core/tx/columnshard/engines/portions/column_record.h index 9d958055cbf..7b4e54f5154 100644 --- a/ydb/core/tx/columnshard/engines/portions/column_record.h +++ b/ydb/core/tx/columnshard/engines/portions/column_record.h @@ -91,7 +91,7 @@ public: } void RegisterBlobIdx(const ui16 blobIdx) { -// AFL_VERIFY(!BlobRange.BlobId.GetTabletId())("original", BlobRange.BlobId.ToStringNew())("new", blobId.ToStringNew()); + AFL_VERIFY(!BlobRange.BlobIdx)("original", BlobRange.BlobIdx)("new", blobIdx); BlobRange.BlobIdx = blobIdx; } diff --git a/ydb/core/tx/columnshard/engines/portions/constructor.cpp b/ydb/core/tx/columnshard/engines/portions/constructor.cpp new file mode 100644 index 00000000000..f3de740f4f8 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/portions/constructor.cpp @@ -0,0 +1,87 @@ +#include "constructor.h" +#include <ydb/core/tx/columnshard/columnshard_schema.h> +#include <ydb/core/tx/columnshard/engines/scheme/index_info.h> +#include <ydb/core/tx/columnshard/engines/scheme/versions/abstract_scheme.h> +#include <ydb/core/tx/columnshard/engines/scheme/versions/versioned_index.h> + +namespace NKikimr::NOlap { + +TPortionInfo TPortionInfoConstructor::Build(const bool needChunksNormalization) { + TPortionInfo result(MetaConstructor.Build()); + AFL_VERIFY(PathId); + result.PathId = PathId; + result.Portion = GetPortionIdVerified(); + + AFL_VERIFY(MinSnapshotDeprecated); + AFL_VERIFY(MinSnapshotDeprecated->Valid()); + result.MinSnapshotDeprecated = *MinSnapshotDeprecated; + if (RemoveSnapshot) { + AFL_VERIFY(RemoveSnapshot->Valid()); + result.RemoveSnapshot = *RemoveSnapshot; + } + result.SchemaVersion = SchemaVersion; + + if (needChunksNormalization) { + ReorderChunks(); + } + FullValidation(); + + result.Indexes = Indexes; + result.Records = Records; + result.BlobIds = BlobIds; + return result; +} + +ISnapshotSchema::TPtr TPortionInfoConstructor::GetSchema(const TVersionedIndex& index) const { + if (SchemaVersion) { + auto schema = index.GetSchema(SchemaVersion.value()); + AFL_VERIFY(!!schema)("details", TStringBuilder() << "cannot find schema for version " << SchemaVersion.value()); + return schema; + } + AFL_VERIFY(MinSnapshotDeprecated); + return index.GetSchema(*MinSnapshotDeprecated); +} + +void TPortionInfoConstructor::LoadRecord(const TIndexInfo& indexInfo, const TColumnChunkLoadContext& loadContext) { + TColumnRecord rec(RegisterBlobId(loadContext.GetBlobRange().GetBlobId()), loadContext, indexInfo.GetColumnFeaturesVerified(loadContext.GetAddress().GetColumnId())); + Records.push_back(std::move(rec)); + + if (loadContext.GetPortionMeta()) { + AFL_VERIFY(MetaConstructor.LoadMetadata(*loadContext.GetPortionMeta(), indexInfo)); + } +} + +void TPortionInfoConstructor::LoadIndex(const TIndexChunkLoadContext& loadContext) { + const auto linkBlobId = RegisterBlobId(loadContext.GetBlobRange().GetBlobId()); + AddIndex(loadContext.BuildIndexChunk(linkBlobId)); +} + +const NKikimr::NOlap::TColumnRecord& TPortionInfoConstructor::AppendOneChunkColumn(TColumnRecord&& record) { + Y_ABORT_UNLESS(record.ColumnId); + std::optional<ui32> maxChunk; + for (auto&& i : Records) { + if (i.ColumnId == record.ColumnId) { + if (!maxChunk) { + maxChunk = i.Chunk; + } else { + Y_ABORT_UNLESS(*maxChunk + 1 == i.Chunk); + maxChunk = i.Chunk; + } + } + } + if (maxChunk) { + AFL_VERIFY(*maxChunk + 1 == record.Chunk)("max", *maxChunk)("record", record.Chunk); + } else { + AFL_VERIFY(0 == record.Chunk)("record", record.Chunk); + } + Records.emplace_back(std::move(record)); + return Records.back(); +} + +void TPortionInfoConstructor::AddMetadata(const ISnapshotSchema& snapshotSchema, const std::shared_ptr<arrow::RecordBatch>& batch) { + Y_ABORT_UNLESS(batch->num_rows() == GetRecordsCount()); + MetaConstructor.FillMetaInfo(NArrow::TFirstLastSpecialKeys(batch), + NArrow::TMinMaxSpecialKeys(batch, TIndexInfo::ArrowSchemaSnapshot()), snapshotSchema.GetIndexInfo()); +} + +}
\ No newline at end of file diff --git a/ydb/core/tx/columnshard/engines/portions/constructor.h b/ydb/core/tx/columnshard/engines/portions/constructor.h new file mode 100644 index 00000000000..d40380bdf4a --- /dev/null +++ b/ydb/core/tx/columnshard/engines/portions/constructor.h @@ -0,0 +1,333 @@ +#pragma once +#include "constructor_meta.h" +#include "column_record.h" +#include "index_chunk.h" +#include "portion_info.h" + +#include <ydb/library/accessor/accessor.h> + +namespace NKikimr::NOlap { +class TPortionInfo; +class TVersionedIndex; +class ISnapshotSchema; +class TIndexChunkLoadContext; + +class TPortionInfoConstructor { +private: + YDB_ACCESSOR(ui64, PathId, 0); + std::optional<ui64> PortionId; + + TPortionMetaConstructor MetaConstructor; + + std::optional<TSnapshot> MinSnapshotDeprecated; + std::optional<TSnapshot> RemoveSnapshot; + std::optional<ui64> SchemaVersion; + + std::vector<TIndexChunk> Indexes; + YDB_ACCESSOR_DEF(std::vector<TColumnRecord>, Records); + std::vector<TUnifiedBlobId> BlobIds; +public: + void SetPortionId(const ui64 value) { + AFL_VERIFY(value); + PortionId = value; + } + + void AddMetadata(const ISnapshotSchema& snapshotSchema, const std::shared_ptr<arrow::RecordBatch>& batch); + + void AddMetadata(const ISnapshotSchema& snapshotSchema, const NArrow::TFirstLastSpecialKeys& firstLastRecords, const NArrow::TMinMaxSpecialKeys& minMaxSpecial) { + MetaConstructor.FillMetaInfo(firstLastRecords, minMaxSpecial, snapshotSchema.GetIndexInfo()); + } + + ui64 GetPortionIdVerified() const { + AFL_VERIFY(PortionId); + AFL_VERIFY(*PortionId); + return *PortionId; + } + + TPortionMetaConstructor& MutableMeta() { + return MetaConstructor; + } + + const TPortionMetaConstructor& GetMeta() const { + return MetaConstructor; + } + + TPortionInfoConstructor(const TPortionInfo& portion, const bool withBlobs, const bool withMetadata) + : PathId(portion.GetPathId()) + , PortionId(portion.GetPortionId()) + , MinSnapshotDeprecated(portion.GetMinSnapshotDeprecated()) + , RemoveSnapshot(portion.GetRemoveSnapshotOptional()) + , SchemaVersion(portion.GetSchemaVersionOptional()) + { + if (withMetadata) { + MetaConstructor = TPortionMetaConstructor(portion.Meta); + } + if (withBlobs) { + Indexes = portion.GetIndexes(); + Records = portion.GetRecords(); + BlobIds = portion.BlobIds; + } + } + + TPortionInfoConstructor(TPortionInfo&& portion) + : PathId(portion.GetPathId()) + , PortionId(portion.GetPortionId()) + , MinSnapshotDeprecated(portion.GetMinSnapshotDeprecated()) + , RemoveSnapshot(portion.GetRemoveSnapshotOptional()) + , SchemaVersion(portion.GetSchemaVersionOptional()) + { + MetaConstructor = TPortionMetaConstructor(portion.Meta); + + Indexes = std::move(portion.Indexes); + Records = std::move(portion.Records); + BlobIds = std::move(portion.BlobIds); + } + + TPortionAddress GetAddress() const { + return TPortionAddress(PathId, GetPortionIdVerified()); + } + + bool HasRemoveSnapshot() const { + return !!RemoveSnapshot; + } + + template <class TChunkInfo> + static void CheckChunksOrder(const std::vector<TChunkInfo>& chunks) { + ui32 entityId = 0; + ui32 chunkIdx = 0; + for (auto&& i : chunks) { + if (entityId != i.GetEntityId()) { + AFL_VERIFY(entityId < i.GetEntityId()); + AFL_VERIFY(i.GetChunkIdx() == 0); + entityId = i.GetEntityId(); + chunkIdx = 0; + } else { + AFL_VERIFY(i.GetChunkIdx() == chunkIdx + 1); + chunkIdx = i.GetChunkIdx(); + } + AFL_VERIFY(i.GetEntityId()); + } + } + + void Merge(TPortionInfoConstructor&& item) { + AFL_VERIFY(item.PathId == PathId); + AFL_VERIFY(item.PortionId == PortionId); + if (item.MinSnapshotDeprecated) { + if (MinSnapshotDeprecated) { + AFL_VERIFY(*MinSnapshotDeprecated == *item.MinSnapshotDeprecated); + } else { + MinSnapshotDeprecated = item.MinSnapshotDeprecated; + } + } + if (item.RemoveSnapshot) { + if (RemoveSnapshot) { + AFL_VERIFY(*RemoveSnapshot == *item.RemoveSnapshot); + } else { + RemoveSnapshot = item.RemoveSnapshot; + } + } + } + + TPortionInfoConstructor(const ui64 pathId, const ui64 portionId) + : PathId(pathId) + , PortionId(portionId) { + AFL_VERIFY(PathId); + AFL_VERIFY(PortionId); + } + + TPortionInfoConstructor(const ui64 pathId) + : PathId(pathId) { + AFL_VERIFY(PathId); + } + + const TSnapshot& GetMinSnapshotDeprecatedVerified() const { + AFL_VERIFY(!!MinSnapshotDeprecated); + return *MinSnapshotDeprecated; + } + + std::shared_ptr<ISnapshotSchema> GetSchema(const TVersionedIndex& index) const; + + void SetMinSnapshotDeprecated(const TSnapshot& snap) { + Y_ABORT_UNLESS(snap.Valid()); + MinSnapshotDeprecated = snap; + } + + void SetSchemaVersion(const ui64 version) { +// AFL_VERIFY(version); engines/ut + SchemaVersion = version; + } + + void SetRemoveSnapshot(const TSnapshot& snap) { + AFL_VERIFY(!RemoveSnapshot); + if (snap.Valid()) { + RemoveSnapshot = snap; + } + } + + void SetRemoveSnapshot(const ui64 planStep, const ui64 txId) { + SetRemoveSnapshot(TSnapshot(planStep, txId)); + } + + void LoadRecord(const TIndexInfo& indexInfo, const TColumnChunkLoadContext& loadContext); + + ui32 GetRecordsCount() const { + ui32 result = 0; + std::optional<ui32> columnIdFirst; + for (auto&& i : Records) { + if (!columnIdFirst || *columnIdFirst == i.ColumnId) { + result += i.GetMeta().GetNumRows(); + columnIdFirst = i.ColumnId; + } + } + AFL_VERIFY(columnIdFirst); + return result; + } + + TBlobRangeLink16::TLinkId RegisterBlobId(const TUnifiedBlobId& blobId) { + AFL_VERIFY(blobId.IsValid()); + TBlobRangeLink16::TLinkId idx = 0; + for (auto&& i : BlobIds) { + if (i == blobId) { + return idx; + } + ++idx; + } + BlobIds.emplace_back(blobId); + return idx; + } + + const TBlobRange RestoreBlobRange(const TBlobRangeLink16& linkRange) const { + return linkRange.RestoreRange(GetBlobId(linkRange.GetBlobIdxVerified())); + } + + const TUnifiedBlobId& GetBlobId(const TBlobRangeLink16::TLinkId linkId) const { + AFL_VERIFY(linkId < BlobIds.size()); + return BlobIds[linkId]; + } + + ui32 GetBlobIdsCount() const { + return BlobIds.size(); + } + + void RegisterBlobIdx(const TChunkAddress& address, const TBlobRangeLink16::TLinkId blobIdx) { + for (auto&& i : Records) { + if (i.GetColumnId() == address.GetEntityId() && i.GetChunkIdx() == address.GetChunkIdx()) { + i.RegisterBlobIdx(blobIdx); + return; + } + } + for (auto&& i : Indexes) { + if (i.GetIndexId() == address.GetEntityId() && i.GetChunkIdx() == address.GetChunkIdx()) { + i.RegisterBlobIdx(blobIdx); + return; + } + } + AFL_VERIFY(false)("problem", "portion haven't address for blob registration")("address", address.DebugString()); + } + + TString DebugString() const { + TStringBuilder sb; + return sb; + } + + void ReorderChunks() { + { + auto pred = [](const TColumnRecord& l, const TColumnRecord& r) { + return l.GetAddress() < r.GetAddress(); + }; + std::sort(Records.begin(), Records.end(), pred); + CheckChunksOrder(Records); + } + { + auto pred = [](const TIndexChunk& l, const TIndexChunk& r) { + return l.GetAddress() < r.GetAddress(); + }; + std::sort(Indexes.begin(), Indexes.end(), pred); + CheckChunksOrder(Indexes); + } + } + + void FullValidation() const { + AFL_VERIFY(Records.size()); + CheckChunksOrder(Records); + CheckChunksOrder(Indexes); + std::set<ui32> blobIdxs; + for (auto&& i : Records) { + blobIdxs.emplace(i.GetBlobRange().GetBlobIdxVerified()); + } + for (auto&& i : Indexes) { + blobIdxs.emplace(i.GetBlobRange().GetBlobIdxVerified()); + } + if (BlobIds.size()) { + AFL_VERIFY(BlobIds.size() == blobIdxs.size()); + AFL_VERIFY(BlobIds.size() == *blobIdxs.rbegin() + 1); + } else { + AFL_VERIFY(blobIdxs.empty()); + } + } + + void LoadIndex(const TIndexChunkLoadContext& loadContext); + + const TColumnRecord& AppendOneChunkColumn(TColumnRecord&& record); + + void AddIndex(const TIndexChunk& chunk) { + ui32 chunkIdx = 0; + for (auto&& i : Indexes) { + if (i.GetIndexId() == chunk.GetIndexId()) { + AFL_VERIFY(chunkIdx == i.GetChunkIdx())("index_id", chunk.GetIndexId())("expected", chunkIdx)("real", i.GetChunkIdx()); + ++chunkIdx; + } + } + AFL_VERIFY(chunkIdx == chunk.GetChunkIdx())("index_id", chunk.GetIndexId())("expected", chunkIdx)("real", chunk.GetChunkIdx()); + Indexes.emplace_back(chunk); + } + + TPortionInfo Build(const bool needChunksNormalization); +}; + +class TPortionConstructors { +private: + THashMap<ui64, THashMap<ui64, TPortionInfoConstructor>> Constructors; +public: + THashMap<ui64, THashMap<ui64, TPortionInfoConstructor>>::iterator begin() { + return Constructors.begin(); + } + + THashMap<ui64, THashMap<ui64, TPortionInfoConstructor>>::iterator end() { + return Constructors.end(); + } + + TPortionInfoConstructor* GetConstructorVerified(const ui64 pathId, const ui64 portionId) { + auto itPathId = Constructors.find(pathId); + AFL_VERIFY(itPathId != Constructors.end()); + auto itPortionId = itPathId->second.find(portionId); + AFL_VERIFY(itPortionId != itPathId->second.end()); + return &itPortionId->second; + } + + TPortionInfoConstructor* AddConstructorVerified(TPortionInfoConstructor&& constructor) { + const ui64 pathId = constructor.GetPathId(); + const ui64 portionId = constructor.GetPortionIdVerified(); + auto info = Constructors[pathId].emplace(portionId, std::move(constructor)); + AFL_VERIFY(info.second); + return &info.first->second; + } + + TPortionInfoConstructor* MergeConstructor(TPortionInfoConstructor&& constructor) { + const ui64 pathId = constructor.GetPathId(); + const ui64 portionId = constructor.GetPortionIdVerified(); + auto itPathId = Constructors.find(pathId); + if (itPathId == Constructors.end()) { + return AddConstructorVerified(std::move(constructor)); + } + auto itPortionId = itPathId->second.find(portionId); + if (itPortionId == itPathId->second.end()) { + return AddConstructorVerified(std::move(constructor)); + } + itPortionId->second.Merge(std::move(constructor)); + return &itPortionId->second; + } + +}; + +}
\ No newline at end of file diff --git a/ydb/core/tx/columnshard/engines/portions/constructor_meta.cpp b/ydb/core/tx/columnshard/engines/portions/constructor_meta.cpp new file mode 100644 index 00000000000..3f88d1f0e1d --- /dev/null +++ b/ydb/core/tx/columnshard/engines/portions/constructor_meta.cpp @@ -0,0 +1,105 @@ +#include "constructor_meta.h" +#include <ydb/core/tx/columnshard/blobs_action/common/const.h> +#include <ydb/core/tx/columnshard/engines/scheme/index_info.h> + +namespace NKikimr::NOlap { + +void TPortionMetaConstructor::FillMetaInfo(const NArrow::TFirstLastSpecialKeys& primaryKeys, const NArrow::TMinMaxSpecialKeys& snapshotKeys, const TIndexInfo& indexInfo) { + AFL_VERIFY(!FirstAndLastPK); + FirstAndLastPK = *primaryKeys.BuildAccordingToSchemaVerified(indexInfo.GetReplaceKey()); + AFL_VERIFY(!RecordSnapshotMin); + AFL_VERIFY(!RecordSnapshotMax); + { + auto cPlanStep = snapshotKeys.GetBatch()->GetColumnByName(TIndexInfo::SPEC_COL_PLAN_STEP); + auto cTxId = snapshotKeys.GetBatch()->GetColumnByName(TIndexInfo::SPEC_COL_TX_ID); + Y_ABORT_UNLESS(cPlanStep && cTxId); + Y_ABORT_UNLESS(cPlanStep->type_id() == arrow::UInt64Type::type_id); + Y_ABORT_UNLESS(cTxId->type_id() == arrow::UInt64Type::type_id); + const arrow::UInt64Array& cPlanStepArray = static_cast<const arrow::UInt64Array&>(*cPlanStep); + const arrow::UInt64Array& cTxIdArray = static_cast<const arrow::UInt64Array&>(*cTxId); + RecordSnapshotMin = TSnapshot(cPlanStepArray.GetView(0), cTxIdArray.GetView(0)); + RecordSnapshotMax = TSnapshot(cPlanStepArray.GetView(snapshotKeys.GetBatch()->num_rows() - 1), cTxIdArray.GetView(snapshotKeys.GetBatch()->num_rows() - 1)); + } +} + +TPortionMetaConstructor::TPortionMetaConstructor(const TPortionMeta& meta) { + FirstAndLastPK = meta.ReplaceKeyEdges; + RecordSnapshotMin = meta.RecordSnapshotMin; + RecordSnapshotMax = meta.RecordSnapshotMax; + TierName = meta.GetTierNameOptional(); + if (!meta.StatisticsStorage.IsEmpty()) { + StatisticsStorage = meta.StatisticsStorage; + } + if (meta.Produced != NPortion::EProduced::UNSPECIFIED) { + Produced = meta.Produced; + } +} + +NKikimr::NOlap::TPortionMeta TPortionMetaConstructor::Build() { + AFL_VERIFY(FirstAndLastPK); + AFL_VERIFY(RecordSnapshotMin); + AFL_VERIFY(RecordSnapshotMax); + TPortionMeta result(*FirstAndLastPK, *RecordSnapshotMin, *RecordSnapshotMax); + if (TierName) { + result.TierName = *TierName; + } + AFL_VERIFY(Produced); + result.Produced = *Produced; + if (StatisticsStorage) { + result.StatisticsStorage = *StatisticsStorage; + } + return result; +} + +bool TPortionMetaConstructor::LoadMetadata(const NKikimrTxColumnShard::TIndexPortionMeta& portionMeta, const TIndexInfo& indexInfo) { + if (!!Produced) { + AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("event", "DeserializeFromProto")("error", "parsing duplication"); + return true; + } + if (portionMeta.HasStatisticsStorage()) { + auto parsed = NStatistics::TPortionStorage::BuildFromProto(portionMeta.GetStatisticsStorage()); + if (!parsed) { + AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("event", "DeserializeFromProto")("error", parsed.GetErrorMessage()); + return false; + } + StatisticsStorage = parsed.DetachResult(); + if (StatisticsStorage->IsEmpty()) { + StatisticsStorage.reset(); + } + } + if (portionMeta.GetTierName()) { + TierName = portionMeta.GetTierName(); + } + if (portionMeta.GetIsInserted()) { + Produced = TPortionMeta::EProduced::INSERTED; + } else if (portionMeta.GetIsCompacted()) { + Produced = TPortionMeta::EProduced::COMPACTED; + } else if (portionMeta.GetIsSplitCompacted()) { + Produced = TPortionMeta::EProduced::SPLIT_COMPACTED; + } else if (portionMeta.GetIsEvicted()) { + Produced = TPortionMeta::EProduced::EVICTED; + } else { + AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("event", "DeserializeFromProto")("error", "incorrect portion meta")("meta", portionMeta.DebugString()); + return false; + } + AFL_VERIFY(Produced != TPortionMeta::EProduced::UNSPECIFIED); + AFL_VERIFY(portionMeta.HasPrimaryKeyBorders()); + FirstAndLastPK = NArrow::TFirstLastSpecialKeys(portionMeta.GetPrimaryKeyBorders(), indexInfo.GetReplaceKey()); + + AFL_VERIFY(portionMeta.HasRecordSnapshotMin()); + RecordSnapshotMin = TSnapshot(portionMeta.GetRecordSnapshotMin().GetPlanStep(), portionMeta.GetRecordSnapshotMin().GetTxId()); + AFL_VERIFY(portionMeta.HasRecordSnapshotMax()); + RecordSnapshotMax = TSnapshot(portionMeta.GetRecordSnapshotMax().GetPlanStep(), portionMeta.GetRecordSnapshotMax().GetTxId()); + return true; +} + +void TPortionMetaConstructor::SetTierName(const TString& tierName) { + AFL_VERIFY(!TierName); + if (!tierName || tierName == NBlobOperations::TGlobal::DefaultStorageId) { + TierName.reset(); + } else { + TierName = tierName; + } +} + +}
\ No newline at end of file diff --git a/ydb/core/tx/columnshard/engines/portions/constructor_meta.h b/ydb/core/tx/columnshard/engines/portions/constructor_meta.h new file mode 100644 index 00000000000..b9603606a2c --- /dev/null +++ b/ydb/core/tx/columnshard/engines/portions/constructor_meta.h @@ -0,0 +1,52 @@ +#pragma once +#include "meta.h" +#include <ydb/core/formats/arrow/special_keys.h> +#include <ydb/core/tx/columnshard/common/portion.h> +#include <ydb/core/tx/columnshard/common/snapshot.h> +#include <ydb/core/tx/columnshard/engines/scheme/statistics/abstract/portion_storage.h> + +namespace NKikimr::NOlap { +class TPortionInfoConstructor; +struct TIndexInfo; + +class TPortionMetaConstructor { +private: + std::optional<NArrow::TFirstLastSpecialKeys> FirstAndLastPK; + std::optional<TString> TierName; + std::optional<NStatistics::TPortionStorage> StatisticsStorage; + std::optional<TSnapshot> RecordSnapshotMin; + std::optional<TSnapshot> RecordSnapshotMax; + std::optional<NPortion::EProduced> Produced; + friend class TPortionInfoConstructor; + void FillMetaInfo(const NArrow::TFirstLastSpecialKeys& primaryKeys, const NArrow::TMinMaxSpecialKeys& snapshotKeys, const TIndexInfo& indexInfo); + +public: + TPortionMetaConstructor() = default; + TPortionMetaConstructor(const TPortionMeta& meta); + + void SetTierName(const TString& tierName); + void ResetTierName(const TString& tierName) { + TierName.reset(); + SetTierName(tierName); + } + + void SetStatisticsStorage(NStatistics::TPortionStorage&& storage) { + AFL_VERIFY(!StatisticsStorage); + StatisticsStorage = std::move(storage); + } + + void ResetStatisticsStorage(NStatistics::TPortionStorage&& storage) { + StatisticsStorage = std::move(storage); + } + + void UpdateRecordsMeta(const NPortion::EProduced prod) { + Produced = prod; + } + + TPortionMeta Build(); + + [[nodiscard]] bool LoadMetadata(const NKikimrTxColumnShard::TIndexPortionMeta& portionMeta, const TIndexInfo& indexInfo); + +}; + +}
\ No newline at end of file diff --git a/ydb/core/tx/columnshard/engines/portions/meta.cpp b/ydb/core/tx/columnshard/engines/portions/meta.cpp index 504ea4b3485..444172568d5 100644 --- a/ydb/core/tx/columnshard/engines/portions/meta.cpp +++ b/ydb/core/tx/columnshard/engines/portions/meta.cpp @@ -2,84 +2,18 @@ #include <ydb/core/formats/arrow/arrow_filter.h> #include <ydb/core/tx/columnshard/engines/scheme/index_info.h> +#include <ydb/core/tx/columnshard/blobs_action/common/const.h> #include <ydb/library/actors/core/log.h> namespace NKikimr::NOlap { -void TPortionMeta::FillBatchInfo(const NArrow::TFirstLastSpecialKeys& primaryKeys, const NArrow::TMinMaxSpecialKeys& snapshotKeys, const TIndexInfo& indexInfo) { - { - ReplaceKeyEdges = primaryKeys.BuildAccordingToSchemaVerified(indexInfo.GetReplaceKey()); - IndexKeyStart = ReplaceKeyEdges->GetFirst(); - IndexKeyEnd = ReplaceKeyEdges->GetLast(); - AFL_VERIFY(IndexKeyStart); - AFL_VERIFY(IndexKeyEnd); - AFL_VERIFY(*IndexKeyStart <= *IndexKeyEnd)("start", IndexKeyStart->DebugString())("end", IndexKeyEnd->DebugString()); - } - - { - auto cPlanStep = snapshotKeys.GetBatch()->GetColumnByName(TIndexInfo::SPEC_COL_PLAN_STEP); - auto cTxId = snapshotKeys.GetBatch()->GetColumnByName(TIndexInfo::SPEC_COL_TX_ID); - Y_ABORT_UNLESS(cPlanStep && cTxId); - Y_ABORT_UNLESS(cPlanStep->type_id() == arrow::UInt64Type::type_id); - Y_ABORT_UNLESS(cTxId->type_id() == arrow::UInt64Type::type_id); - const arrow::UInt64Array& cPlanStepArray = static_cast<const arrow::UInt64Array&>(*cPlanStep); - const arrow::UInt64Array& cTxIdArray = static_cast<const arrow::UInt64Array&>(*cTxId); - RecordSnapshotMin = TSnapshot(cPlanStepArray.GetView(0), cTxIdArray.GetView(0)); - RecordSnapshotMax = TSnapshot(cPlanStepArray.GetView(snapshotKeys.GetBatch()->num_rows() - 1), cTxIdArray.GetView(snapshotKeys.GetBatch()->num_rows() - 1)); - } -} - -bool TPortionMeta::DeserializeFromProto(const NKikimrTxColumnShard::TIndexPortionMeta& portionMeta, const TIndexInfo& indexInfo) { - if (Produced != TPortionMeta::EProduced::UNSPECIFIED) { - AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("event", "DeserializeFromProto")("error", "parsing duplication"); - return true; - } - { - auto parsed = NStatistics::TPortionStorage::BuildFromProto(portionMeta.GetStatisticsStorage()); - if (!parsed) { - AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("event", "DeserializeFromProto")("error", parsed.GetErrorMessage()); - return false; - } - StatisticsStorage = parsed.DetachResult(); - } - TierName = portionMeta.GetTierName(); - if (portionMeta.GetIsInserted()) { - Produced = TPortionMeta::EProduced::INSERTED; - } else if (portionMeta.GetIsCompacted()) { - Produced = TPortionMeta::EProduced::COMPACTED; - } else if (portionMeta.GetIsSplitCompacted()) { - Produced = TPortionMeta::EProduced::SPLIT_COMPACTED; - } else if (portionMeta.GetIsEvicted()) { - Produced = TPortionMeta::EProduced::EVICTED; - } else { - AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("event", "DeserializeFromProto")("error", "incorrect portion meta")("meta", portionMeta.DebugString()); - return false; - } - Y_ABORT_UNLESS(Produced != TPortionMeta::EProduced::UNSPECIFIED); - - if (portionMeta.HasPrimaryKeyBorders()) { - ReplaceKeyEdges = std::make_shared<NArrow::TFirstLastSpecialKeys>(portionMeta.GetPrimaryKeyBorders(), indexInfo.GetReplaceKey()); - IndexKeyStart = ReplaceKeyEdges->GetFirst(); - IndexKeyEnd = ReplaceKeyEdges->GetLast(); - AFL_VERIFY(IndexKeyStart); - AFL_VERIFY(IndexKeyEnd); - AFL_VERIFY (*IndexKeyStart <= *IndexKeyEnd)("start", IndexKeyStart->DebugString())("end", IndexKeyEnd->DebugString()); - } - - if (portionMeta.HasRecordSnapshotMin()) { - RecordSnapshotMin = TSnapshot(portionMeta.GetRecordSnapshotMin().GetPlanStep(), portionMeta.GetRecordSnapshotMin().GetTxId()); - } - if (portionMeta.HasRecordSnapshotMax()) { - RecordSnapshotMax = TSnapshot(portionMeta.GetRecordSnapshotMax().GetPlanStep(), portionMeta.GetRecordSnapshotMax().GetTxId()); - } - return true; -} - NKikimrTxColumnShard::TIndexPortionMeta TPortionMeta::SerializeToProto() const { NKikimrTxColumnShard::TIndexPortionMeta portionMeta; portionMeta.SetTierName(TierName); - *portionMeta.MutableStatisticsStorage() = StatisticsStorage.SerializeToProto(); + if (!StatisticsStorage.IsEmpty()) { + *portionMeta.MutableStatisticsStorage() = StatisticsStorage.SerializeToProto(); + } switch (Produced) { case TPortionMeta::EProduced::UNSPECIFIED: Y_ABORT_UNLESS(false); @@ -101,18 +35,10 @@ NKikimrTxColumnShard::TIndexPortionMeta TPortionMeta::SerializeToProto() const { break; } - if (ReplaceKeyEdges) { - portionMeta.SetPrimaryKeyBorders(ReplaceKeyEdges->SerializeToStringDataOnlyNoCompression()); - } + portionMeta.SetPrimaryKeyBorders(ReplaceKeyEdges.SerializeToStringDataOnlyNoCompression()); - if (RecordSnapshotMin) { - portionMeta.MutableRecordSnapshotMin()->SetPlanStep(RecordSnapshotMin->GetPlanStep()); - portionMeta.MutableRecordSnapshotMin()->SetTxId(RecordSnapshotMin->GetTxId()); - } - if (RecordSnapshotMax) { - portionMeta.MutableRecordSnapshotMax()->SetPlanStep(RecordSnapshotMax->GetPlanStep()); - portionMeta.MutableRecordSnapshotMax()->SetTxId(RecordSnapshotMax->GetTxId()); - } + RecordSnapshotMin.SerializeToProto(*portionMeta.MutableRecordSnapshotMin()); + RecordSnapshotMax.SerializeToProto(*portionMeta.MutableRecordSnapshotMax()); return portionMeta; } @@ -126,6 +52,14 @@ TString TPortionMeta::DebugString() const { return sb; } +std::optional<TString> TPortionMeta::GetTierNameOptional() const { + if (TierName && TierName != NBlobOperations::TGlobal::DefaultStorageId) { + return TierName; + } else { + return std::nullopt; + } +} + TString TPortionAddress::DebugString() const { return TStringBuilder() << "(path_id=" << PathId << ";portion_id=" << PortionId << ")"; } diff --git a/ydb/core/tx/columnshard/engines/portions/meta.h b/ydb/core/tx/columnshard/engines/portions/meta.h index 69ee81b114d..4d0dc65b1f0 100644 --- a/ydb/core/tx/columnshard/engines/portions/meta.h +++ b/ydb/core/tx/columnshard/engines/portions/meta.h @@ -14,56 +14,42 @@ struct TIndexInfo; struct TPortionMeta { private: - std::shared_ptr<NArrow::TFirstLastSpecialKeys> ReplaceKeyEdges; // first and last PK rows - YDB_ACCESSOR_DEF(TString, TierName); + NArrow::TFirstLastSpecialKeys ReplaceKeyEdges; // first and last PK rows + YDB_READONLY_DEF(TString, TierName); YDB_READONLY_DEF(NStatistics::TPortionStorage, StatisticsStorage); + friend class TPortionMetaConstructor; + TPortionMeta(NArrow::TFirstLastSpecialKeys& pk, const TSnapshot& min, const TSnapshot& max) + : ReplaceKeyEdges(pk) + , IndexKeyStart(pk.GetFirst()) + , IndexKeyEnd(pk.GetLast()) + , RecordSnapshotMin(min) + , RecordSnapshotMax(max) + { + AFL_VERIFY(IndexKeyStart <= IndexKeyEnd)("start", IndexKeyStart.DebugString())("end", IndexKeyEnd.DebugString()); + } public: using EProduced = NPortion::EProduced; - std::optional<NArrow::TReplaceKey> IndexKeyStart; - std::optional<NArrow::TReplaceKey> IndexKeyEnd; + NArrow::TReplaceKey IndexKeyStart; + NArrow::TReplaceKey IndexKeyEnd; - std::optional<TSnapshot> RecordSnapshotMin; - std::optional<TSnapshot> RecordSnapshotMax; - EProduced Produced{EProduced::UNSPECIFIED}; - - ui64 GetMetadataMemorySize() const { - return sizeof(TPortionMeta) + ReplaceKeyEdges->GetMemorySize(); - } + TSnapshot RecordSnapshotMin; + TSnapshot RecordSnapshotMax; + EProduced Produced = EProduced::UNSPECIFIED; - void SetStatisticsStorage(NStatistics::TPortionStorage&& storage) { - AFL_VERIFY(StatisticsStorage.IsEmpty()); - StatisticsStorage = std::move(storage); - } + std::optional<TString> GetTierNameOptional() const; - void ResetStatisticsStorage(NStatistics::TPortionStorage&& storage) { - StatisticsStorage = std::move(storage); + ui64 GetMetadataMemorySize() const { + return sizeof(TPortionMeta) + ReplaceKeyEdges.GetMemorySize(); } - bool DeserializeFromProto(const NKikimrTxColumnShard::TIndexPortionMeta& portionMeta, const TIndexInfo& indexInfo); - NKikimrTxColumnShard::TIndexPortionMeta SerializeToProto() const; - void FillBatchInfo(const NArrow::TFirstLastSpecialKeys& primaryKeys, const NArrow::TMinMaxSpecialKeys& snapshotKeys, const TIndexInfo& indexInfo); - EProduced GetProduced() const { return Produced; } TString DebugString() const; - - friend IOutputStream& operator << (IOutputStream& out, const TPortionMeta& info) { - out << info.DebugString(); - return out; - } - - bool HasSnapshotMinMax() const { - return !!RecordSnapshotMax && !!RecordSnapshotMin; - } - - bool HasPrimaryKeyBorders() const { - return !!IndexKeyStart && !!IndexKeyEnd; - } }; class TPortionAddress { diff --git a/ydb/core/tx/columnshard/engines/portions/portion_info.cpp b/ydb/core/tx/columnshard/engines/portions/portion_info.cpp index d717f03c60e..0466bdac04b 100644 --- a/ydb/core/tx/columnshard/engines/portions/portion_info.cpp +++ b/ydb/core/tx/columnshard/engines/portions/portion_info.cpp @@ -1,4 +1,5 @@ #include "portion_info.h" +#include "constructor.h" #include <ydb/core/tx/columnshard/blobs_reader/task.h> #include <ydb/core/tx/columnshard/data_sharing/protos/data.pb.h> #include <ydb/core/tx/columnshard/engines/column_engine.h> @@ -14,40 +15,6 @@ namespace NKikimr::NOlap { -const TColumnRecord& TPortionInfo::AppendOneChunkColumn(TColumnRecord&& record) { - Y_ABORT_UNLESS(record.ColumnId); - std::optional<ui32> maxChunk; - for (auto&& i : Records) { - if (i.ColumnId == record.ColumnId) { - if (!maxChunk) { - maxChunk = i.Chunk; - } else { - Y_ABORT_UNLESS(*maxChunk + 1 == i.Chunk); - maxChunk = i.Chunk; - } - } - } - if (maxChunk) { - AFL_VERIFY(*maxChunk + 1 == record.Chunk)("max", *maxChunk)("record", record.Chunk); - } else { - AFL_VERIFY(0 == record.Chunk)("record", record.Chunk); - } - Records.emplace_back(std::move(record)); - return Records.back(); -} - -void TPortionInfo::AddMetadata(const ISnapshotSchema& snapshotSchema, const std::shared_ptr<arrow::RecordBatch>& batch, const TString& tierName) { - Y_ABORT_UNLESS(batch->num_rows() == NumRows()); - AddMetadata(snapshotSchema, NArrow::TFirstLastSpecialKeys(NArrow::ExtractColumns(batch, snapshotSchema.GetIndexInfo().GetReplaceKey())), - NArrow::TMinMaxSpecialKeys(batch, TIndexInfo::ArrowSchemaSnapshot()), tierName); -} - -void TPortionInfo::AddMetadata(const ISnapshotSchema& snapshotSchema, const NArrow::TFirstLastSpecialKeys& primaryKeys, const NArrow::TMinMaxSpecialKeys& snapshotKeys, const TString& tierName) { - const auto& indexInfo = snapshotSchema.GetIndexInfo(); - Meta.FillBatchInfo(primaryKeys, snapshotKeys, indexInfo); - Meta.SetTierName(tierName); -} - std::shared_ptr<arrow::Scalar> TPortionInfo::MaxValue(ui32 columnId) const { std::shared_ptr<arrow::Scalar> result; for (auto&& i : Records) { @@ -130,18 +97,6 @@ TString TPortionInfo::DebugString(const bool withDetails) const { return sb << ")"; } -void TPortionInfo::LoadMetadata(const NKikimrTxColumnShard::TIndexPortionMeta& portionMeta, const TIndexInfo& indexInfo) { - AFL_VERIFY(Meta.DeserializeFromProto(portionMeta, indexInfo)); -} - -void TPortionInfo::AddRecord(const TIndexInfo& indexInfo, const TColumnRecord& rec, const NKikimrTxColumnShard::TIndexPortionMeta* portionMeta) { - Records.push_back(rec); - - if (portionMeta) { - AFL_VERIFY(Meta.DeserializeFromProto(*portionMeta, indexInfo)); - } -} - std::vector<const NKikimr::NOlap::TColumnRecord*> TPortionInfo::GetColumnChunksPointers(const ui32 columnId) const { std::vector<const TColumnRecord*> result; for (auto&& c : Records) { @@ -154,21 +109,6 @@ std::vector<const NKikimr::NOlap::TColumnRecord*> TPortionInfo::GetColumnChunksP return result; } -bool TPortionInfo::IsEqualWithSnapshots(const TPortionInfo& item) const { - return PathId == item.PathId && MinSnapshotDeprecated == item.MinSnapshotDeprecated - && Portion == item.Portion && RemoveSnapshot == item.RemoveSnapshot; -} - -bool TPortionInfo::TakeSnapshots(const TPortionInfo& item) { - if (MinSnapshotDeprecated.Valid()) { - return PathId == item.PathId && MinSnapshotDeprecated == item.MinSnapshotDeprecated - && Portion == item.Portion && RemoveSnapshot == item.RemoveSnapshot; - } else { - MinSnapshotDeprecated = item.MinSnapshotDeprecated; - return PathId == item.PathId && Portion == item.Portion && RemoveSnapshot == item.RemoveSnapshot; - } -} - void TPortionInfo::RemoveFromDatabase(IDbWrapper& db) const { for (auto& record : Records) { db.EraseColumn(*this, record); @@ -316,9 +256,6 @@ TConclusionStatus TPortionInfo::DeserializeFromProto(const NKikimrColumnShardDat return parse; } } - if (!Meta.DeserializeFromProto(proto.GetMeta(), info)) { - return TConclusionStatus::Fail("cannot parse meta"); - } for (auto&& i : proto.GetRecords()) { auto parse = TColumnRecord::BuildFromProto(i, info.GetColumnFeaturesVerified(i.GetColumnId())); if (!parse) { @@ -337,7 +274,11 @@ TConclusionStatus TPortionInfo::DeserializeFromProto(const NKikimrColumnShardDat } TConclusion<TPortionInfo> TPortionInfo::BuildFromProto(const NKikimrColumnShardDataSharingProto::TPortionInfo& proto, const TIndexInfo& info) { - TPortionInfo result; + TPortionMetaConstructor constructor; + if (!constructor.LoadMetadata(proto.GetMeta(), info)) { + return TConclusionStatus::Fail("cannot parse meta"); + } + TPortionInfo result(constructor.Build()); auto parse = result.DeserializeFromProto(proto, info); if (!parse) { return parse; @@ -466,19 +407,6 @@ void TPortionInfo::FillBlobIdsByStorage(THashMap<TString, THashSet<TUnifiedBlobI return FillBlobIdsByStorage(result, schema->GetIndexInfo()); } -TBlobRangeLink16::TLinkId TPortionInfo::RegisterBlobId(const TUnifiedBlobId& blobId) { - AFL_VERIFY(blobId.IsValid()); - TBlobRangeLink16::TLinkId idx = 0; - for (auto&& i : BlobIds) { - if (i == blobId) { - return idx; - } - ++idx; - } - BlobIds.emplace_back(blobId); - return idx; -} - THashMap<TString, THashMap<TUnifiedBlobId, std::vector<std::shared_ptr<IPortionDataChunk>>>> TPortionInfo::RestoreEntityChunks(NBlobOperations::NRead::TCompositeReadBlobs& blobs, const TIndexInfo& indexInfo) const { THashMap<TString, THashMap<TUnifiedBlobId, std::vector<std::shared_ptr<IPortionDataChunk>>>> result; for (auto&& c : GetRecords()) { @@ -719,6 +647,15 @@ public: }; } +ISnapshotSchema::TPtr TPortionInfo::TSchemaCursor::GetSchema(const TPortionInfoConstructor& portion) { + if (!CurrentSchema || portion.GetMinSnapshotDeprecatedVerified() != LastSnapshot) { + CurrentSchema = portion.GetSchema(VersionedIndex); + LastSnapshot = portion.GetMinSnapshotDeprecatedVerified(); + } + AFL_VERIFY(!!CurrentSchema); + return CurrentSchema; +} + NArrow::NAccessor::IChunkedArray::TCurrentChunkAddress TDeserializeChunkedArray::DoGetChunk(const std::optional<TCurrentChunkAddress>& chunkCurrent, const ui64 position) const { TChunkAccessor accessor(Chunks, Loader); return SelectChunk(chunkCurrent, position, accessor); diff --git a/ydb/core/tx/columnshard/engines/portions/portion_info.h b/ydb/core/tx/columnshard/engines/portions/portion_info.h index 86c5617f067..966d6363941 100644 --- a/ydb/core/tx/columnshard/engines/portions/portion_info.h +++ b/ydb/core/tx/columnshard/engines/portions/portion_info.h @@ -22,6 +22,7 @@ namespace NKikimr::NOlap { namespace NBlobOperations::NRead { class TCompositeReadBlobs; } +class TPortionInfoConstructor; struct TIndexInfo; class TVersionedIndex; @@ -99,6 +100,8 @@ public: } }; +class TPortionInfoConstructor; + class TPortionInfo { public: using TRuntimeFeatures = ui8; @@ -106,7 +109,12 @@ public: Optimized = 1 /* "optimized" */ }; private: - TPortionInfo() = default; + friend class TPortionInfoConstructor; + TPortionInfo(TPortionMeta&& meta) + : Meta(std::move(meta)) + { + + } ui64 PathId = 0; ui64 Portion = 0; // Id of independent (overlayed by PK) portion of data in pathId TSnapshot MinSnapshotDeprecated = TSnapshot::Zero(); // {PlanStep, TxId} is min snapshot for {Granule, Portion} @@ -114,7 +122,6 @@ private: std::optional<ui64> SchemaVersion; TPortionMeta Meta; - ui64 DeprecatedGranuleId = 0; YDB_READONLY_DEF(std::vector<TIndexChunk>, Indexes); YDB_READONLY(TRuntimeFeatures, RuntimeFeatures, 0); std::vector<TUnifiedBlobId> BlobIds; @@ -165,6 +172,15 @@ private: public: ui64 GetMinMemoryForReadColumns(const std::optional<std::set<ui32>>& columnIds) const; + void SetRemoveSnapshot(const TSnapshot& snap) { + AFL_VERIFY(!RemoveSnapshot.Valid()); + RemoveSnapshot = snap; + } + + void SetRemoveSnapshot(const ui64 planStep, const ui64 txId) { + SetRemoveSnapshot(TSnapshot(planStep, txId)); + } + void InitRuntimeFeature(const ERuntimeFeature feature, const bool activity) { if (activity) { AddRuntimeFeature(feature); @@ -181,11 +197,6 @@ public: RuntimeFeatures &= (Max<TRuntimeFeatures>() - (TRuntimeFeatures)feature); } - void OnAfterLoad() const { - CheckChunksOrder(Records); - CheckChunksOrder(Indexes); - } - bool HasRuntimeFeature(const ERuntimeFeature feature) const { if (feature == ERuntimeFeature::Optimized) { if ((RuntimeFeatures & (TRuntimeFeatures)feature)) { @@ -230,10 +241,6 @@ public: THashMap<TChunkAddress, TString> DecodeBlobAddresses(NBlobOperations::NRead::TCompositeReadBlobs&& blobs, const TIndexInfo& indexInfo) const; - void SetStatisticsStorage(NStatistics::TPortionStorage&& storage) { - Meta.SetStatisticsStorage(std::move(storage)); - } - const TString& GetColumnStorageId(const ui32 columnId, const TIndexInfo& indexInfo) const; const TString& GetEntityStorageId(const ui32 entityId, const TIndexInfo& indexInfo) const; @@ -277,40 +284,10 @@ public: return PathId; } - TBlobRangeLink16::TLinkId RegisterBlobId(const TUnifiedBlobId& blobId); - - void RegisterBlobIdx(const TChunkAddress& address, const TBlobRangeLink16::TLinkId blobIdx) { - for (auto it = Records.begin(); it != Records.end(); ++it) { - if (it->ColumnId == address.GetEntityId() && it->Chunk == address.GetChunkIdx()) { - it->RegisterBlobIdx(blobIdx); - return; - } - } - for (auto it = Indexes.begin(); it != Indexes.end(); ++it) { - if (it->GetIndexId() == address.GetEntityId() && it->GetChunkIdx() == address.GetChunkIdx()) { - it->RegisterBlobIdx(blobIdx); - return; - } - } - AFL_VERIFY(false)("problem", "portion haven't address for blob registration")("address", address.DebugString()); - } - void RemoveFromDatabase(IDbWrapper& db) const; void SaveToDatabase(IDbWrapper& db, const ui32 firstPKColumnId, const bool saveOnlyMeta) const; - void AddIndex(const TIndexChunk& chunk) { - ui32 chunkIdx = 0; - for (auto&& i : Indexes) { - if (i.GetIndexId() == chunk.GetIndexId()) { - AFL_VERIFY(chunkIdx == i.GetChunkIdx())("index_id", chunk.GetIndexId())("expected", chunkIdx)("real", i.GetChunkIdx()); - ++chunkIdx; - } - } - AFL_VERIFY(chunkIdx == chunk.GetChunkIdx())("index_id", chunk.GetIndexId())("expected", chunkIdx)("real", chunk.GetChunkIdx()); - Indexes.emplace_back(chunk); - } - bool OlderThen(const TPortionInfo& info) const { return RecordSnapshotMin() < info.RecordSnapshotMin(); } @@ -376,10 +353,6 @@ public: return result; } - void ResetMeta() { - Meta = TPortionMeta(); - } - const TPortionMeta& GetMeta() const { return Meta; } @@ -427,7 +400,7 @@ public: bool Empty() const { return Records.empty(); } bool Produced() const { return Meta.GetProduced() != TPortionMeta::EProduced::UNSPECIFIED; } - bool Valid() const { return ValidSnapshotInfo() && !Empty() && Produced() && Meta.IndexKeyStart && Meta.IndexKeyEnd; } + bool Valid() const { return ValidSnapshotInfo() && !Empty() && Produced(); } bool ValidSnapshotInfo() const { return MinSnapshotDeprecated.Valid() && PathId && Portion; } bool IsInserted() const { return Meta.GetProduced() == TPortionMeta::EProduced::INSERTED; } bool IsEvicted() const { return Meta.GetProduced() == TPortionMeta::EProduced::EVICTED; } @@ -435,28 +408,6 @@ public: bool CanIntersectOthers() const { return !Valid() || IsInserted() || IsEvicted(); } size_t NumChunks() const { return Records.size(); } - TPortionInfo CopyBeforeChunksRebuild() const { - TPortionInfo result = *this; - result.Records.clear(); - result.Indexes.clear(); - result.BlobIds.clear(); - return result; - } - - bool IsEqualWithSnapshots(const TPortionInfo& item) const; - bool TakeSnapshots(const TPortionInfo& item); - - static TPortionInfo BuildEmpty() { - return TPortionInfo(); - } - - TPortionInfo(const ui64 pathId, const ui64 portionId, const ui64 schemaVersion, const TSnapshot& minSnapshot) - : PathId(pathId) - , Portion(portionId) - , MinSnapshotDeprecated(minSnapshot) - , SchemaVersion(schemaVersion) { - } - TString DebugString(const bool withDetails = false) const; bool HasRemoveSnapshot() const { @@ -479,19 +430,10 @@ public: return HasRemoveSnapshot(); } - bool AllowEarlyFilter() const { - return Meta.GetProduced() == TPortionMeta::EProduced::COMPACTED - || Meta.GetProduced() == TPortionMeta::EProduced::SPLIT_COMPACTED; - } - ui64 GetPortion() const { return Portion; } - ui64 GetDeprecatedGranuleId() const { - return DeprecatedGranuleId; - } - TPortionAddress GetAddress() const { return TPortionAddress(PathId, Portion); } @@ -504,9 +446,6 @@ public: Portion = portion; } - void SetDeprecatedGranuleId(const ui64 granuleId) { - DeprecatedGranuleId = granuleId; - } const TSnapshot& GetMinSnapshotDeprecated() const { return MinSnapshotDeprecated; @@ -530,26 +469,8 @@ public: return SchemaVersion.value(); } - void SetMinSnapshotDeprecated(const TSnapshot& snap) { - Y_ABORT_UNLESS(snap.Valid()); - MinSnapshotDeprecated = snap; - } - - void SetSchemaVersion(const ui64 version) { - AFL_VERIFY(version); - SchemaVersion = version; - } - - void SetRemoveSnapshot(const TSnapshot& snap) { - const bool wasValid = RemoveSnapshot.Valid(); - Y_ABORT_UNLESS(!wasValid || snap.Valid()); - RemoveSnapshot = snap; - } - - void SetRemoveSnapshot(const ui64 planStep, const ui64 txId) { - const bool wasValid = RemoveSnapshot.Valid(); - RemoveSnapshot = TSnapshot(planStep, txId); - Y_ABORT_UNLESS(!wasValid || RemoveSnapshot.Valid()); + std::optional<ui64> GetSchemaVersionOptional() const { + return SchemaVersion; } bool IsVisible(const TSnapshot& snapshot) const { @@ -566,38 +487,22 @@ public: return visible; } - void UpdateRecordsMeta(TPortionMeta::EProduced produced) { - Meta.Produced = produced; - } - - void AddRecord(const TIndexInfo& indexInfo, const TColumnRecord& rec, const NKikimrTxColumnShard::TIndexPortionMeta* portionMeta); - - void LoadMetadata(const NKikimrTxColumnShard::TIndexPortionMeta& portionMeta, const TIndexInfo& indexInfo); - void AddMetadata(const ISnapshotSchema& snapshotSchema, const std::shared_ptr<arrow::RecordBatch>& batch, - const TString& tierName); - void AddMetadata(const ISnapshotSchema& snapshotSchema, const NArrow::TFirstLastSpecialKeys& primaryKeys, const NArrow::TMinMaxSpecialKeys& snapshotKeys, - const TString& tierName); - std::shared_ptr<arrow::Scalar> MaxValue(ui32 columnId) const; const NArrow::TReplaceKey& IndexKeyStart() const { - Y_ABORT_UNLESS(Meta.IndexKeyStart); - return *Meta.IndexKeyStart; + return Meta.IndexKeyStart; } const NArrow::TReplaceKey& IndexKeyEnd() const { - Y_ABORT_UNLESS(Meta.IndexKeyEnd); - return *Meta.IndexKeyEnd; + return Meta.IndexKeyEnd; } const TSnapshot& RecordSnapshotMin() const { - Y_ABORT_UNLESS(Meta.RecordSnapshotMin); - return *Meta.RecordSnapshotMin; + return Meta.RecordSnapshotMin; } const TSnapshot& RecordSnapshotMax() const { - Y_ABORT_UNLESS(Meta.RecordSnapshotMax); - return *Meta.RecordSnapshotMax; + return Meta.RecordSnapshotMax; } @@ -616,10 +521,12 @@ public: : VersionedIndex(versionedIndex) {} + ISnapshotSchema::TPtr GetSchema(const TPortionInfoConstructor& portion); + ISnapshotSchema::TPtr GetSchema(const TPortionInfo& portion) { if (!CurrentSchema || portion.MinSnapshotDeprecated != LastSnapshot) { CurrentSchema = portion.GetSchema(VersionedIndex); - LastSnapshot = portion.GetMinSnapshotDeprecated(); + LastSnapshot = portion.MinSnapshotDeprecated; } AFL_VERIFY(!!CurrentSchema)("portion", portion.DebugString()); return CurrentSchema; @@ -881,8 +788,6 @@ public: return batch; } - const TColumnRecord& AppendOneChunkColumn(TColumnRecord&& record); - friend IOutputStream& operator << (IOutputStream& out, const TPortionInfo& info) { out << info.DebugString(); return out; diff --git a/ydb/core/tx/columnshard/engines/portions/read_with_blobs.cpp b/ydb/core/tx/columnshard/engines/portions/read_with_blobs.cpp index 09b98597378..3c7e700e7b4 100644 --- a/ydb/core/tx/columnshard/engines/portions/read_with_blobs.cpp +++ b/ydb/core/tx/columnshard/engines/portions/read_with_blobs.cpp @@ -1,11 +1,9 @@ #include "read_with_blobs.h" #include "write_with_blobs.h" #include <ydb/core/tx/columnshard/engines/scheme/index_info.h> -#include <ydb/core/tx/columnshard/engines/scheme/filtered_scheme.h> -#include <ydb/core/tx/columnshard/engines/column_engine.h> -#include <ydb/core/tx/columnshard/blobs_reader/task.h> -#include <ydb/core/tx/columnshard/splitter/batch_slice.h> +#include <ydb/core/tx/columnshard/engines/scheme/versions/filtered_scheme.h> #include <ydb/core/tx/columnshard/hooks/abstract/abstract.h> +#include <ydb/core/tx/columnshard/splitter/batch_slice.h> namespace NKikimr::NOlap { @@ -151,10 +149,10 @@ std::optional<TWritePortionInfoWithBlobs> TReadPortionInfoWithBlobs::SyncPortion auto schemaTo = std::make_shared<TDefaultSchemaDetails>(to, std::make_shared<TSerializationStats>()); TGeneralSerializedSlice slice(entityChunksNew, schemaTo, counters); const NSplitter::TEntityGroups groups = to->GetIndexInfo().GetEntityGroupsByStorageId(targetTier, *storages); - TWritePortionInfoWithBlobs result = TWritePortionInfoWithBlobs::BuildByBlobs(slice.GroupChunksByBlobs(groups), source.PortionInfo, storages); - result.GetPortionInfo().SetMinSnapshotDeprecated(to->GetSnapshot()); - result.GetPortionInfo().SetSchemaVersion(to->GetVersion()); - result.GetPortionInfo().MutableMeta().SetTierName(targetTier); + TPortionInfoConstructor constructor(source.PortionInfo, false, true); + constructor.SetMinSnapshotDeprecated(to->GetSnapshot()); + constructor.SetSchemaVersion(to->GetVersion()); + constructor.MutableMeta().ResetTierName(targetTier); NStatistics::TPortionStorage storage; for (auto&& i : to->GetIndexInfo().GetStatisticsByName()) { @@ -165,7 +163,9 @@ std::optional<TWritePortionInfoWithBlobs> TReadPortionInfoWithBlobs::SyncPortion i.second->FillStatisticsData(entityChunksNew, storage, to->GetIndexInfo()); } } - result.MutablePortionInfo().MutableMeta().ResetStatisticsStorage(std::move(storage)); + constructor.MutableMeta().ResetStatisticsStorage(std::move(storage)); + + TWritePortionInfoWithBlobs result = TWritePortionInfoWithBlobs::BuildByBlobs(slice.GroupChunksByBlobs(groups), std::move(constructor), storages); return result; } diff --git a/ydb/core/tx/columnshard/engines/portions/read_with_blobs.h b/ydb/core/tx/columnshard/engines/portions/read_with_blobs.h index 2c8553f9884..6d688db6607 100644 --- a/ydb/core/tx/columnshard/engines/portions/read_with_blobs.h +++ b/ydb/core/tx/columnshard/engines/portions/read_with_blobs.h @@ -1,13 +1,13 @@ #pragma once #include "base_with_blobs.h" +#include "common.h" #include "portion_info.h" -#include <ydb/core/tx/columnshard/blob.h> -#include <ydb/core/tx/columnshard/splitter/blob_info.h> -#include <ydb/core/tx/columnshard/splitter/chunks.h> -#include <ydb/core/tx/columnshard/engines/scheme/statistics/abstract/common.h> -#include <ydb/core/tx/columnshard/engines/scheme/statistics/abstract/operator.h> + +#include <ydb/core/tx/columnshard/splitter/abstract/chunks.h> #include <ydb/library/accessor/accessor.h> +#include <contrib/libs/apache/arrow/cpp/src/arrow/record_batch.h> +#include <map> namespace NKikimr::NOlap { diff --git a/ydb/core/tx/columnshard/engines/portions/write_with_blobs.cpp b/ydb/core/tx/columnshard/engines/portions/write_with_blobs.cpp index 99929745437..4fbc7cb3305 100644 --- a/ydb/core/tx/columnshard/engines/portions/write_with_blobs.cpp +++ b/ydb/core/tx/columnshard/engines/portions/write_with_blobs.cpp @@ -1,10 +1,5 @@ #include "write_with_blobs.h" #include <ydb/core/tx/columnshard/engines/scheme/index_info.h> -#include <ydb/core/tx/columnshard/engines/scheme/filtered_scheme.h> -#include <ydb/core/tx/columnshard/engines/column_engine.h> -#include <ydb/core/tx/columnshard/blobs_reader/task.h> -#include <ydb/core/tx/columnshard/splitter/batch_slice.h> -#include <ydb/core/tx/columnshard/hooks/abstract/abstract.h> namespace NKikimr::NOlap { @@ -19,25 +14,27 @@ void TWritePortionInfoWithBlobs::TBlobInfo::AddChunk(TWritePortionInfoWithBlobs& Y_ABORT_UNLESS(Chunks.emplace(chunk->GetChunkAddressVerified(), chunk).second); ChunksOrdered.emplace_back(chunk); - chunk->AddIntoPortionBeforeBlob(bRange, owner.PortionInfo); + chunk->AddIntoPortionBeforeBlob(bRange, owner.GetPortionConstructor()); } void TWritePortionInfoWithBlobs::TBlobInfo::RegisterBlobId(TWritePortionInfoWithBlobs& owner, const TUnifiedBlobId& blobId) { - const TBlobRangeLink16::TLinkId idx = owner.PortionInfo.RegisterBlobId(blobId); + const TBlobRangeLink16::TLinkId idx = owner.GetPortionConstructor().RegisterBlobId(blobId); for (auto&& i : Chunks) { - owner.PortionInfo.RegisterBlobIdx(i.first, idx); + owner.GetPortionConstructor().RegisterBlobIdx(i.first, idx); } } TWritePortionInfoWithBlobs TWritePortionInfoWithBlobs::BuildByBlobs(std::vector<TSplittedBlob>&& chunks, const ui64 granule, const ui64 schemaVersion, const TSnapshot& snapshot, const std::shared_ptr<IStoragesManager>& operators) { - return BuildByBlobs(std::move(chunks), TPortionInfo(granule, 0, schemaVersion, snapshot), operators); + TPortionInfoConstructor constructor(granule); + constructor.SetMinSnapshotDeprecated(snapshot); + constructor.SetSchemaVersion(schemaVersion); + return BuildByBlobs(std::move(chunks), std::move(constructor), operators); } -TWritePortionInfoWithBlobs TWritePortionInfoWithBlobs::BuildByBlobs(std::vector<TSplittedBlob>&& chunks, const TPortionInfo& basePortion, - const std::shared_ptr<IStoragesManager>& operators) { - TWritePortionInfoWithBlobs result(basePortion.CopyBeforeChunksRebuild()); +TWritePortionInfoWithBlobs TWritePortionInfoWithBlobs::BuildByBlobs(std::vector<TSplittedBlob>&& chunks, TPortionInfoConstructor&& constructor, const std::shared_ptr<IStoragesManager>& operators) { + TWritePortionInfoWithBlobs result(std::move(constructor)); for (auto&& blob : chunks) { auto storage = operators->GetOperatorVerified(blob.GetGroupName()); auto blobInfo = result.StartBlob(storage); @@ -45,7 +42,6 @@ TWritePortionInfoWithBlobs TWritePortionInfoWithBlobs::BuildByBlobs(std::vector< blobInfo.AddChunk(chunk); } } - result.GetPortionInfo().ReorderChunks(); return result; } @@ -75,7 +71,7 @@ void TWritePortionInfoWithBlobs::FillStatistics(const TIndexInfo& index) { } i.second->FillStatisticsData(data, storage, index); } - PortionInfo.SetStatisticsStorage(std::move(storage)); + GetPortionConstructor().MutableMeta().SetStatisticsStorage(std::move(storage)); } } diff --git a/ydb/core/tx/columnshard/engines/portions/write_with_blobs.h b/ydb/core/tx/columnshard/engines/portions/write_with_blobs.h index c84d97bd06c..39c3bb885fe 100644 --- a/ydb/core/tx/columnshard/engines/portions/write_with_blobs.h +++ b/ydb/core/tx/columnshard/engines/portions/write_with_blobs.h @@ -1,13 +1,11 @@ #pragma once #include "base_with_blobs.h" -#include "portion_info.h" -#include <ydb/core/tx/columnshard/blob.h> -#include <ydb/core/tx/columnshard/splitter/blob_info.h> -#include <ydb/core/tx/columnshard/splitter/chunks.h> -#include <ydb/core/tx/columnshard/engines/scheme/statistics/abstract/common.h> -#include <ydb/core/tx/columnshard/engines/scheme/statistics/abstract/operator.h> +#include "constructor.h" #include <ydb/library/accessor/accessor.h> +#include <ydb/core/tx/columnshard/blobs_action/abstract/storages_manager.h> +#include <ydb/core/tx/columnshard/common/snapshot.h> +#include <ydb/core/tx/columnshard/splitter/blob_info.h> namespace NKikimr::NOlap { @@ -63,15 +61,12 @@ public: void RegisterBlobId(TWritePortionInfoWithBlobs& owner, const TUnifiedBlobId& blobId); }; private: - TPortionInfo PortionInfo; + std::optional<TPortionInfoConstructor> PortionConstructor; + std::optional<TPortionInfo> PortionResult; YDB_READONLY_DEF(std::vector<TBlobInfo>, Blobs); - explicit TWritePortionInfoWithBlobs(TPortionInfo&& portionInfo) - : PortionInfo(std::move(portionInfo)) { - } - - explicit TWritePortionInfoWithBlobs(const TPortionInfo& portionInfo) - : PortionInfo(portionInfo) { + explicit TWritePortionInfoWithBlobs(TPortionInfoConstructor&& portionConstructor) + : PortionConstructor(std::move(portionConstructor)) { } TBlobInfo::TBuilder StartBlob(const std::shared_ptr<IBlobsStorageOperator>& bOperator) { @@ -80,10 +75,6 @@ private: } public: - TPortionInfo& MutablePortionInfo() { - return PortionInfo; - } - std::vector<std::shared_ptr<IPortionDataChunk>> GetEntityChunks(const ui32 entityId) const; void FillStatistics(const TIndexInfo& index); @@ -91,8 +82,8 @@ public: static TWritePortionInfoWithBlobs BuildByBlobs(std::vector<TSplittedBlob>&& chunks, const ui64 granule, const ui64 schemaVersion, const TSnapshot& snapshot, const std::shared_ptr<IStoragesManager>& operators); - static TWritePortionInfoWithBlobs BuildByBlobs(std::vector<TSplittedBlob>&& chunks, const TPortionInfo& basePortion, - const std::shared_ptr<IStoragesManager>& operators); + static TWritePortionInfoWithBlobs BuildByBlobs(std::vector<TSplittedBlob>&& chunks, + TPortionInfoConstructor&& constructor, const std::shared_ptr<IStoragesManager>& operators); const TString& GetBlobByRangeVerified(const ui32 columnId, const ui32 chunkId) const { for (auto&& b : Blobs) { @@ -123,21 +114,28 @@ public: } TString DebugString() const { - return TStringBuilder() << PortionInfo.DebugString() << ";blobs_count=" << Blobs.size() << ";"; + return TStringBuilder() << "blobs_count=" << Blobs.size() << ";"; } - const TPortionInfo& GetPortionInfo() const { - return PortionInfo; + void FinalizePortionConstructor() { + AFL_VERIFY(!!PortionConstructor); + AFL_VERIFY(!PortionResult); + PortionResult = PortionConstructor->Build(true); + PortionConstructor.reset(); } - TPortionInfo& GetPortionInfo() { - return PortionInfo; + const TPortionInfo& GetPortionResult() const { + AFL_VERIFY(!PortionConstructor); + AFL_VERIFY(!!PortionResult); + return *PortionResult; } - friend IOutputStream& operator << (IOutputStream& out, const TWritePortionInfoWithBlobs& info) { - out << info.DebugString(); - return out; + TPortionInfoConstructor& GetPortionConstructor() { + AFL_VERIFY(!!PortionConstructor); + AFL_VERIFY(!PortionResult); + return *PortionConstructor; } + }; } // namespace NKikimr::NOlap diff --git a/ydb/core/tx/columnshard/engines/portions/ya.make b/ydb/core/tx/columnshard/engines/portions/ya.make index 7143d9fc5ea..d9f35dbbac8 100644 --- a/ydb/core/tx/columnshard/engines/portions/ya.make +++ b/ydb/core/tx/columnshard/engines/portions/ya.make @@ -6,6 +6,8 @@ SRCS( base_with_blobs.cpp read_with_blobs.cpp write_with_blobs.cpp + constructor.cpp + constructor_meta.cpp meta.cpp common.cpp index_chunk.cpp diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/constructor/read_metadata.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/constructor/read_metadata.h index 8be1cf19a97..371a09d7310 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/constructor/read_metadata.h +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/constructor/read_metadata.h @@ -74,7 +74,8 @@ public: << " at snapshot: " << GetRequestSnapshot().DebugString(); TBase::Dump(out); if (SelectInfo) { - out << ", " << *SelectInfo; + out << ", "; + SelectInfo->DebugStream(out); } } diff --git a/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/meta.cpp b/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/meta.cpp index f645bd4a308..2d7b8390aca 100644 --- a/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/meta.cpp +++ b/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/meta.cpp @@ -1,6 +1,7 @@ #include "meta.h" #include <ydb/core/tx/columnshard/blobs_action/abstract/storages_manager.h> #include <ydb/core/tx/columnshard/engines/portions/column_record.h> +#include <ydb/core/tx/columnshard/engines/portions/constructor.h> #include <ydb/core/tx/columnshard/engines/portions/portion_info.h> #include <ydb/core/tx/columnshard/engines/scheme/index_info.h> #include <ydb/core/tx/columnshard/engines/storage/chunks/data.h> @@ -11,7 +12,7 @@ namespace NKikimr::NOlap::NIndexes { -void TPortionIndexChunk::DoAddIntoPortionBeforeBlob(const TBlobRangeLink16& bRange, TPortionInfo& portionInfo) const { +void TPortionIndexChunk::DoAddIntoPortionBeforeBlob(const TBlobRangeLink16& bRange, TPortionInfoConstructor& portionInfo) const { AFL_VERIFY(!bRange.IsValid()); portionInfo.AddIndex(TIndexChunk(GetEntityId(), GetChunkIdxVerified(), RecordsCount, RawBytes, bRange)); } diff --git a/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/meta.h b/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/meta.h index 1f414cdc030..95997072e43 100644 --- a/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/meta.h +++ b/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/meta.h @@ -124,7 +124,7 @@ protected: virtual std::shared_ptr<arrow::Scalar> DoGetLastScalar() const override { return nullptr; } - virtual void DoAddIntoPortionBeforeBlob(const TBlobRangeLink16& bRange, TPortionInfo& portionInfo) const override; + virtual void DoAddIntoPortionBeforeBlob(const TBlobRangeLink16& bRange, TPortionInfoConstructor& portionInfo) const override; public: TPortionIndexChunk(const TChunkAddress& address, const ui32 recordsCount, const ui64 rawBytes, const TString& data) : TBase(address.GetColumnId(), address.GetChunkIdx()) diff --git a/ydb/core/tx/columnshard/engines/storage/chunks/data.cpp b/ydb/core/tx/columnshard/engines/storage/chunks/data.cpp index 373c88a765f..79613b5b979 100644 --- a/ydb/core/tx/columnshard/engines/storage/chunks/data.cpp +++ b/ydb/core/tx/columnshard/engines/storage/chunks/data.cpp @@ -1,9 +1,10 @@ #include "data.h" #include <ydb/core/tx/columnshard/engines/portions/portion_info.h> +#include <ydb/core/tx/columnshard/engines/portions/constructor.h> namespace NKikimr::NOlap::NChunks { -void TPortionIndexChunk::DoAddIntoPortionBeforeBlob(const TBlobRangeLink16& bRange, TPortionInfo& portionInfo) const { +void TPortionIndexChunk::DoAddIntoPortionBeforeBlob(const TBlobRangeLink16& bRange, TPortionInfoConstructor& portionInfo) const { AFL_VERIFY(!bRange.IsValid()); portionInfo.AddIndex(TIndexChunk(GetEntityId(), GetChunkIdxVerified(), RecordsCount, RawBytes, bRange)); } diff --git a/ydb/core/tx/columnshard/engines/storage/chunks/data.h b/ydb/core/tx/columnshard/engines/storage/chunks/data.h index dfa9189e560..d5a91c19609 100644 --- a/ydb/core/tx/columnshard/engines/storage/chunks/data.h +++ b/ydb/core/tx/columnshard/engines/storage/chunks/data.h @@ -31,7 +31,7 @@ protected: virtual std::shared_ptr<arrow::Scalar> DoGetLastScalar() const override { return nullptr; } - virtual void DoAddIntoPortionBeforeBlob(const TBlobRangeLink16& bRange, TPortionInfo& portionInfo) const override; + virtual void DoAddIntoPortionBeforeBlob(const TBlobRangeLink16& bRange, TPortionInfoConstructor& portionInfo) const override; virtual std::shared_ptr<IPortionDataChunk> DoCopyWithAnotherBlob(TString&& data, const TSimpleColumnInfo& /*columnInfo*/) const override { return std::make_shared<TPortionIndexChunk>(GetChunkAddressVerified(), RecordsCount, RawBytes, std::move(data)); } diff --git a/ydb/core/tx/columnshard/engines/storage/granule.cpp b/ydb/core/tx/columnshard/engines/storage/granule.cpp index e1b6a9b74c1..c9b0d22d517 100644 --- a/ydb/core/tx/columnshard/engines/storage/granule.cpp +++ b/ydb/core/tx/columnshard/engines/storage/granule.cpp @@ -43,13 +43,6 @@ bool TGranuleMeta::ErasePortion(const ui64 portion) { return true; } -void TGranuleMeta::AddColumnRecordOnLoad(const TIndexInfo& indexInfo, const TPortionInfo& portion, const TColumnChunkLoadContext& rec, const NKikimrTxColumnShard::TIndexPortionMeta* portionMeta) { - std::shared_ptr<TPortionInfo> pInfo = UpsertPortionOnLoad(portion); - TColumnRecord cRecord(pInfo->RegisterBlobId(rec.GetBlobRange().GetBlobId()), rec, indexInfo.GetColumnFeaturesVerified(rec.GetAddress().GetColumnId())); - AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("event", "AddColumnRecordOnLoad")("portion_info", portion.DebugString())("record", cRecord.DebugString()); - pInfo->AddRecord(indexInfo, cRecord, portionMeta); -} - void TGranuleMeta::OnAfterChangePortion(const std::shared_ptr<TPortionInfo> portionAfter, NStorageOptimizer::IOptimizerPlanner::TModificationGuard* modificationGuard) { if (portionAfter) { AFL_VERIFY(PortionsByPK[portionAfter->IndexKeyStart()].emplace(portionAfter->GetPortion(), portionAfter).second); @@ -161,17 +154,11 @@ bool TGranuleMeta::InCompaction() const { return Activity.contains(EActivity::GeneralCompaction); } -std::shared_ptr<TPortionInfo> TGranuleMeta::UpsertPortionOnLoad(const TPortionInfo& portion) { - auto it = Portions.find(portion.GetPortion()); - AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("event", "UpsertPortionOnLoad")("portion_info", portion.DebugString()); - if (it == Portions.end()) { - Y_ABORT_UNLESS(portion.Records.empty()); - auto portionNew = std::make_shared<TPortionInfo>(portion); - it = Portions.emplace(portion.GetPortion(), portionNew).first; - } else { - AFL_VERIFY(it->second->TakeSnapshots(portion))("self", it->second->DebugString())("item", portion.DebugString()); - } - return it->second; +std::shared_ptr<TPortionInfo> TGranuleMeta::UpsertPortionOnLoad(TPortionInfo&& portion) { + auto portionId = portion.GetPortionId(); + auto emplaceInfo = Portions.emplace(portionId, std::make_shared<TPortionInfo>(std::move(portion))); + AFL_VERIFY(emplaceInfo.second); + return emplaceInfo.first->second; } } // namespace NKikimr::NOlap diff --git a/ydb/core/tx/columnshard/engines/storage/granule.h b/ydb/core/tx/columnshard/engines/storage/granule.h index 7113b69256a..1002139fbf3 100644 --- a/ydb/core/tx/columnshard/engines/storage/granule.h +++ b/ydb/core/tx/columnshard/engines/storage/granule.h @@ -218,7 +218,6 @@ public: void OnAfterPortionsLoad() { auto g = OptimizerPlanner->StartModificationGuard(); for (auto&& i : Portions) { - i.second->OnAfterLoad(); OnAfterChangePortion(i.second, &g); } } @@ -272,9 +271,7 @@ public: ; } - std::shared_ptr<TPortionInfo> UpsertPortionOnLoad(const TPortionInfo& portion); - - void AddColumnRecordOnLoad(const TIndexInfo& indexInfo, const TPortionInfo& portion, const TColumnChunkLoadContext& rec, const NKikimrTxColumnShard::TIndexPortionMeta* portionMeta); + std::shared_ptr<TPortionInfo> UpsertPortionOnLoad(TPortionInfo&& portion); const THashMap<ui64, std::shared_ptr<TPortionInfo>>& GetPortions() const { return Portions; diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/abstract/optimizer.h b/ydb/core/tx/columnshard/engines/storage/optimizer/abstract/optimizer.h index 8090de1dee6..e4c284aea5b 100644 --- a/ydb/core/tx/columnshard/engines/storage/optimizer/abstract/optimizer.h +++ b/ydb/core/tx/columnshard/engines/storage/optimizer/abstract/optimizer.h @@ -83,19 +83,11 @@ public: THashMap<ui64, std::shared_ptr<TPortionInfo>> RemovePortions; public: TModificationGuard& AddPortion(const std::shared_ptr<TPortionInfo>& portion) { - if (HasAppData() && AppDataVerified().ColumnShardConfig.GetSkipOldGranules() && portion->GetDeprecatedGranuleId() > 0) { - AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "skip_granule")("granule_id", portion->GetDeprecatedGranuleId()); - return *this; - } AFL_VERIFY(AddPortions.emplace(portion->GetPortionId(), portion).second); return*this; } TModificationGuard& RemovePortion(const std::shared_ptr<TPortionInfo>& portion) { - if (HasAppData() && AppDataVerified().ColumnShardConfig.GetSkipOldGranules() && portion->GetDeprecatedGranuleId() > 0) { - AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "skip_granule")("granule_id", portion->GetDeprecatedGranuleId()); - return *this; - } AFL_VERIFY(RemovePortions.emplace(portion->GetPortionId(), portion).second); return*this; } diff --git a/ydb/core/tx/columnshard/engines/ut/ut_insert_table.cpp b/ydb/core/tx/columnshard/engines/ut/ut_insert_table.cpp index 9ed9f504755..34a570d2de0 100644 --- a/ydb/core/tx/columnshard/engines/ut/ut_insert_table.cpp +++ b/ydb/core/tx/columnshard/engines/ut/ut_insert_table.cpp @@ -34,13 +34,13 @@ public: virtual void ErasePortion(const NOlap::TPortionInfo& /*portion*/) override { } - virtual bool LoadPortions(const std::function<void(NOlap::TPortionInfo&&, const NKikimrTxColumnShard::TIndexPortionMeta&)>& /*callback*/) override { + virtual bool LoadPortions(const std::function<void(NOlap::TPortionInfoConstructor&&, const NKikimrTxColumnShard::TIndexPortionMeta&)>& /*callback*/) override { return true; } void WriteColumn(const TPortionInfo&, const TColumnRecord&, const ui32 /*firstPKColumnId*/) override {} void EraseColumn(const TPortionInfo&, const TColumnRecord&) override {} - bool LoadColumns(const std::function<void(const TPortionInfo&, const TColumnChunkLoadContext&)>&) override { return true; } + bool LoadColumns(const std::function<void(NOlap::TPortionInfoConstructor&&, const TColumnChunkLoadContext&)>&) override { return true; } virtual void WriteIndex(const TPortionInfo& /*portion*/, const TIndexChunk& /*row*/) override {} virtual void EraseIndex(const TPortionInfo& /*portion*/, const TIndexChunk& /*row*/) override {} diff --git a/ydb/core/tx/columnshard/engines/ut/ut_logs_engine.cpp b/ydb/core/tx/columnshard/engines/ut/ut_logs_engine.cpp index 641b8e604b1..e8813bed509 100644 --- a/ydb/core/tx/columnshard/engines/ut/ut_logs_engine.cpp +++ b/ydb/core/tx/columnshard/engines/ut/ut_logs_engine.cpp @@ -35,7 +35,7 @@ private: std::map<TPortionAddress, std::map<TChunkAddress, TColumnChunkLoadContext>> LoadContexts; public: struct TIndex { - THashMap<ui64, THashMap<ui64, TPortionInfo>> Columns; // pathId -> portions + THashMap<ui64, THashMap<ui64, TPortionInfoConstructor>> Columns; // pathId -> portions THashMap<ui32, ui64> Counters; }; @@ -86,7 +86,7 @@ public: virtual void ErasePortion(const NOlap::TPortionInfo& /*portion*/) override { } - virtual bool LoadPortions(const std::function<void(NOlap::TPortionInfo&&, const NKikimrTxColumnShard::TIndexPortionMeta&)>& /*callback*/) override { + virtual bool LoadPortions(const std::function<void(NOlap::TPortionInfoConstructor&&, const NKikimrTxColumnShard::TIndexPortionMeta&)>& /*callback*/) override { return true; } @@ -104,19 +104,21 @@ public: } auto it = data.find(portion.GetPortion()); if (it == data.end()) { - it = data.emplace(portion.GetPortion(), portion.CopyBeforeChunksRebuild()).first; + it = data.emplace(portion.GetPortion(), TPortionInfoConstructor(portion, false, true)).first; } else { - Y_ABORT_UNLESS(portion.GetPathId() == it->second.GetPathId() && portion.GetPortion() == it->second.GetPortion()); + Y_ABORT_UNLESS(portion.GetPathId() == it->second.GetPathId() && portion.GetPortion() == it->second.GetPortionIdVerified()); } it->second.SetMinSnapshotDeprecated(portion.GetMinSnapshotDeprecated()); if (portion.HasRemoveSnapshot()) { - it->second.SetRemoveSnapshot(portion.GetRemoveSnapshotVerified()); + if (!it->second.HasRemoveSnapshot()) { + it->second.SetRemoveSnapshot(portion.GetRemoveSnapshotVerified()); + } } else { AFL_VERIFY(!it->second.HasRemoveSnapshot()); } bool replaced = false; - for (auto& rec : it->second.Records) { + for (auto& rec : it->second.MutableRecords()) { if (rec.IsEqualTest(row)) { rec = row; replaced = true; @@ -124,7 +126,7 @@ public: } } if (!replaced) { - it->second.Records.push_back(row); + it->second.MutableRecords().emplace_back(row); } } @@ -135,26 +137,26 @@ public: auto& portionLocal = it->second; std::vector<TColumnRecord> filtered; - for (auto& rec : portionLocal.Records) { + for (auto& rec : portionLocal.GetRecords()) { if (!rec.IsEqualTest(row)) { filtered.push_back(rec); } } - portionLocal.Records.swap(filtered); + portionLocal.MutableRecords().swap(filtered); } - bool LoadColumns(const std::function<void(const TPortionInfo&, const TColumnChunkLoadContext&)>& callback) override { + bool LoadColumns(const std::function<void(NOlap::TPortionInfoConstructor&&, const TColumnChunkLoadContext&)>& callback) override { auto& columns = Indices[0].Columns; for (auto& [pathId, portions] : columns) { for (auto& [portionId, portionLocal] : portions) { auto copy = portionLocal; - copy.ResetMeta(); - copy.Records.clear(); - for (const auto& rec : portionLocal.Records) { + copy.MutableRecords().clear(); + for (const auto& rec : portionLocal.GetRecords()) { auto itContextLoader = LoadContexts[copy.GetAddress()].find(rec.GetAddress()); Y_ABORT_UNLESS(itContextLoader != LoadContexts[copy.GetAddress()].end()); - callback(copy, itContextLoader->second); - LoadContexts[copy.GetAddress()].erase(itContextLoader); + auto address = copy.GetAddress(); + callback(std::move(copy), itContextLoader->second); + LoadContexts[address].erase(itContextLoader); } } } @@ -267,10 +269,10 @@ TString MakeTestBlob(i64 start = 0, i64 end = 100) { void AddIdsToBlobs(std::vector<TWritePortionInfoWithBlobs>& portions, NBlobOperations::NRead::TCompositeReadBlobs& blobs, ui32& step) { for (auto& portion : portions) { - for (auto& rec : portion.GetPortionInfo().Records) { - rec.BlobRange.BlobIdx = portion.GetPortionInfo().RegisterBlobId(MakeUnifiedBlobId(++step, portion.GetBlobFullSizeVerified(rec.ColumnId, rec.Chunk))); + for (auto& rec : portion.GetPortionConstructor().MutableRecords()) { + rec.BlobRange.BlobIdx = portion.GetPortionConstructor().RegisterBlobId(MakeUnifiedBlobId(++step, portion.GetBlobFullSizeVerified(rec.ColumnId, rec.Chunk))); TString data = portion.GetBlobByRangeVerified(rec.ColumnId, rec.Chunk); - blobs.Add(IStoragesManager::DefaultStorageId, portion.GetPortionInfo().RestoreBlobRange(rec.BlobRange), std::move(data)); + blobs.Add(IStoragesManager::DefaultStorageId, portion.GetPortionConstructor().RestoreBlobRange(rec.BlobRange), std::move(data)); } } } @@ -303,7 +305,7 @@ bool Insert(TColumnEngineForLogs& engine, TTestDbWrapper& db, TSnapshot snap, AddIdsToBlobs(changes->AppendedPortions, blobs, step); - const bool result = engine.ApplyChanges(db, changes, snap); + const bool result = engine.ApplyChangesOnTxCreate(changes, snap) && engine.ApplyChangesOnExecute(db, changes, snap); NOlap::TWriteIndexContext contextExecute(nullptr, db, engine); changes->WriteIndexOnExecute(nullptr, contextExecute); @@ -334,15 +336,15 @@ bool Compact(TColumnEngineForLogs& engine, TTestDbWrapper& db, TSnapshot snap, N // UNIT_ASSERT_VALUES_EQUAL(changes->GetTmpGranuleIds().size(), expected.NewGranules); - const bool result = engine.ApplyChanges(db, changes, snap); + const bool result = engine.ApplyChangesOnTxCreate(changes, snap) && engine.ApplyChangesOnExecute(db, changes, snap); NOlap::TWriteIndexContext contextExecute(nullptr, db, engine); changes->WriteIndexOnExecute(nullptr, contextExecute); NOlap::TWriteIndexCompleteContext contextComplete(NActors::TActivationContext::AsActorContext(), 0, 0, TDuration::Zero(), engine); changes->WriteIndexOnComplete(nullptr, contextComplete); if (blobsPool) { for (auto&& i : changes->AppendedPortions) { - for (auto&& r : i.GetPortionInfo().Records) { - Y_ABORT_UNLESS(blobsPool->emplace(i.GetPortionInfo().RestoreBlobRange(r.BlobRange), i.GetBlobByRangeVerified(r.ColumnId, r.Chunk)).second); + for (auto&& r : i.GetPortionResult().GetRecords()) { + Y_ABORT_UNLESS(blobsPool->emplace(i.GetPortionResult().RestoreBlobRange(r.BlobRange), i.GetBlobByRangeVerified(r.ColumnId, r.Chunk)).second); } } } @@ -361,7 +363,7 @@ bool Cleanup(TColumnEngineForLogs& engine, TTestDbWrapper& db, TSnapshot snap, u changes->StartEmergency(); - const bool result = engine.ApplyChanges(db, changes, snap); + const bool result = engine.ApplyChangesOnTxCreate(changes, snap) && engine.ApplyChangesOnExecute(db, changes, snap); NOlap::TWriteIndexContext contextExecute(nullptr, db, engine); changes->WriteIndexOnExecute(nullptr, contextExecute); NOlap::TWriteIndexCompleteContext contextComplete(NActors::TActivationContext::AsActorContext(), 0, 0, TDuration::Zero(), engine); @@ -375,11 +377,11 @@ bool Ttl(TColumnEngineForLogs& engine, TTestDbWrapper& db, std::vector<std::shared_ptr<TTTLColumnEngineChanges>> vChanges = engine.StartTtl(pathEviction, EmptyDataLocksManager, 512 * 1024 * 1024); AFL_VERIFY(vChanges.size() == 1)("count", vChanges.size()); auto changes = vChanges.front(); - UNIT_ASSERT_VALUES_EQUAL(changes->PortionsToRemove.size(), expectedToDrop); + UNIT_ASSERT_VALUES_EQUAL(changes->GetPortionsToRemove().size(), expectedToDrop); changes->StartEmergency(); - const bool result = engine.ApplyChanges(db, changes, TSnapshot(1,1)); + const bool result = engine.ApplyChangesOnTxCreate(changes, TSnapshot(1, 1)) && engine.ApplyChangesOnExecute(db, changes, TSnapshot(1, 1)); NOlap::TWriteIndexContext contextExecute(nullptr, db, engine); changes->WriteIndexOnExecute(nullptr, contextExecute); NOlap::TWriteIndexCompleteContext contextComplete(NActors::TActivationContext::AsActorContext(), 0, 0, TDuration::Zero(), engine); diff --git a/ydb/core/tx/columnshard/normalizer/portion/min_max.cpp b/ydb/core/tx/columnshard/normalizer/portion/min_max.cpp index 4923592882f..e459ba07086 100644 --- a/ydb/core/tx/columnshard/normalizer/portion/min_max.cpp +++ b/ydb/core/tx/columnshard/normalizer/portion/min_max.cpp @@ -2,8 +2,9 @@ #include "normalizer.h" #include <ydb/core/tx/columnshard/engines/portions/portion_info.h> -#include <ydb/core/tx/columnshard/tables_manager.h> #include <ydb/core/tx/columnshard/engines/scheme/filtered_scheme.h> +#include <ydb/core/tx/columnshard/engines/portions/constructor.h> +#include <ydb/core/tx/columnshard/tables_manager.h> #include <ydb/core/formats/arrow/arrow_helpers.h> @@ -46,7 +47,7 @@ protected: auto preparedBatch = portionInfo->PrepareForAssemble(**blobSchema, *filteredSchema, blobsDataAssemble); auto batch = preparedBatch.Assemble(); Y_ABORT_UNLESS(!!batch); - portionInfo->AddMetadata(**blobSchema, batch, portionInfo->GetMeta().GetTierName()); +// portionInfo->AddMetadata(**blobSchema, batch, portionInfo->GetMeta().GetTierName()); } auto changes = std::make_shared<TPortionsNormalizer::TNormalizerResult>(std::move(Portions), std::move(firstPKColumnIdByPathId)); @@ -78,10 +79,7 @@ public: return portion->GetTotalRawBytes(); } - static bool CheckPortion(const TPortionInfo& portionInfo) { - if (!portionInfo.GetMeta().HasPrimaryKeyBorders() || !portionInfo.GetMeta().HasSnapshotMinMax()) { - return false; - } + static bool CheckPortion(const TPortionInfo& /*portionInfo*/) { return true; } @@ -114,7 +112,7 @@ public: auto rowProto = chunk.GetMeta().SerializeToProto(); *rowProto.MutablePortionMeta() = portionInfo->GetMeta().SerializeToProto(); - db.Table<Schema::IndexColumns>().Key(0, portionInfo->GetDeprecatedGranuleId(), chunk.ColumnId, + db.Table<Schema::IndexColumns>().Key(0, portionInfo->GetPathId(), chunk.ColumnId, portionInfo->GetMinSnapshotDeprecated().GetPlanStep(), portionInfo->GetMinSnapshotDeprecated().GetTxId(), portionInfo->GetPortion(), chunk.Chunk).Update( NIceDb::TUpdate<Schema::IndexColumns::Metadata>(rowProto.SerializeAsString()) ); @@ -146,7 +144,7 @@ TConclusion<std::vector<INormalizerTask::TPtr>> TPortionsNormalizer::Init(const return tasks; } - THashMap<ui64, std::shared_ptr<TPortionInfo>> portions; + THashMap<ui64, TPortionInfoConstructor> portions; auto schemas = std::make_shared<THashMap<ui64, ISnapshotSchema::TPtr>>(); auto pkColumnIds = TMinMaxSnapshotChangesTask::GetColumnsFilter(tablesManager.GetPrimaryIndexSafe().GetVersionedIndex().GetLastSchema()); @@ -157,39 +155,32 @@ TConclusion<std::vector<INormalizerTask::TPtr>> TPortionsNormalizer::Init(const } TPortionInfo::TSchemaCursor schema(tablesManager.GetPrimaryIndexSafe().GetVersionedIndex()); - auto initPortionCB = [&](const TPortionInfo& portion, const TColumnChunkLoadContext& loadContext) { + auto initPortionCB = [&](TPortionInfoConstructor&& portion, const TColumnChunkLoadContext& loadContext) { auto currentSchema = schema.GetSchema(portion); - AFL_VERIFY(portion.ValidSnapshotInfo())("details", portion.DebugString()); if (!pkColumnIds.contains(loadContext.GetAddress().GetColumnId())) { return; } - auto it = portions.find(portion.GetPortion()); - auto portionMeta = loadContext.GetPortionMeta(); + auto it = portions.find(portion.GetPortionIdVerified()); if (it == portions.end()) { - Y_ABORT_UNLESS(portion.Records.empty()); - (*schemas)[portion.GetPortionId()] = currentSchema; - it = portions.emplace(portion.GetPortion(), std::make_shared<TPortionInfo>(portion)).first; + (*schemas)[portion.GetPortionIdVerified()] = currentSchema; + const ui64 portionId = portion.GetPortionIdVerified(); + it = portions.emplace(portionId, std::move(portion)).first; + } else { + it->second.Merge(std::move(portion)); } - TColumnRecord rec(it->second->RegisterBlobId(loadContext.GetBlobRange().GetBlobId()), loadContext, currentSchema->GetIndexInfo().GetColumnFeaturesVerified(loadContext.GetAddress().GetColumnId())); - AFL_VERIFY(it->second->IsEqualWithSnapshots(portion))("self", it->second->DebugString())("item", portion.DebugString()); - it->second->AddRecord(currentSchema->GetIndexInfo(), rec, portionMeta); + it->second.LoadRecord(currentSchema->GetIndexInfo(), loadContext); }; while (!rowset.EndOfSet()) { - TPortionInfo portion = TPortionInfo::BuildEmpty(); - auto index = rowset.GetValue<Schema::IndexColumns::Index>(); - Y_ABORT_UNLESS(index == 0); - - portion.SetPathId(rowset.GetValue<Schema::IndexColumns::PathId>()); + TPortionInfoConstructor portion(rowset.GetValue<Schema::IndexColumns::PathId>(), rowset.GetValue<Schema::IndexColumns::Portion>()); + Y_ABORT_UNLESS(rowset.GetValue<Schema::IndexColumns::Index>() == 0); portion.SetMinSnapshotDeprecated(NOlap::TSnapshot(rowset.GetValue<Schema::IndexColumns::PlanStep>(), rowset.GetValue<Schema::IndexColumns::TxId>())); - portion.SetPortion(rowset.GetValue<Schema::IndexColumns::Portion>()); - portion.SetDeprecatedGranuleId(rowset.GetValue<Schema::IndexColumns::Granule>()); portion.SetRemoveSnapshot(rowset.GetValue<Schema::IndexColumns::XPlanStep>(), rowset.GetValue<Schema::IndexColumns::XTxId>()); NOlap::TColumnChunkLoadContext chunkLoadContext(rowset, &DsGroupSelector); - initPortionCB(portion, chunkLoadContext); + initPortionCB(std::move(portion), chunkLoadContext); if (!rowset.Next()) { return TConclusionStatus::Fail("Not ready"); @@ -201,12 +192,13 @@ TConclusion<std::vector<INormalizerTask::TPtr>> TPortionsNormalizer::Init(const package.reserve(100); ui64 brokenPortioncCount = 0; - for (auto&& portion : portions) { - if (TMinMaxSnapshotChangesTask::CheckPortion(*portion.second)) { + for (auto&& portionConstructor : portions) { + auto portionInfo = std::make_shared<TPortionInfo>(portionConstructor.second.Build(false)); + if (TMinMaxSnapshotChangesTask::CheckPortion(*portionInfo)) { continue; } ++brokenPortioncCount; - package.emplace_back(portion.second); + package.emplace_back(portionInfo); if (package.size() == 1000) { std::vector<std::shared_ptr<TPortionInfo>> local; local.swap(package); diff --git a/ydb/core/tx/columnshard/splitter/abstract/chunks.h b/ydb/core/tx/columnshard/splitter/abstract/chunks.h index e873ff80560..e3be37be2bd 100644 --- a/ydb/core/tx/columnshard/splitter/abstract/chunks.h +++ b/ydb/core/tx/columnshard/splitter/abstract/chunks.h @@ -13,6 +13,7 @@ class TSplitterCounters; namespace NKikimr::NOlap { class TPortionInfo; +class TPortionInfoConstructor; class TSimpleColumnInfo; class TColumnSaver; @@ -33,7 +34,7 @@ protected: virtual std::optional<ui32> DoGetRecordsCount() const = 0; virtual std::shared_ptr<arrow::Scalar> DoGetFirstScalar() const = 0; virtual std::shared_ptr<arrow::Scalar> DoGetLastScalar() const = 0; - virtual void DoAddIntoPortionBeforeBlob(const TBlobRangeLink16& bRange, TPortionInfo& portionInfo) const = 0; + virtual void DoAddIntoPortionBeforeBlob(const TBlobRangeLink16& bRange, TPortionInfoConstructor& portionInfo) const = 0; virtual std::shared_ptr<IPortionDataChunk> DoCopyWithAnotherBlob(TString&& /*data*/, const TSimpleColumnInfo& /*columnInfo*/) const { AFL_VERIFY(false); return nullptr; @@ -116,7 +117,7 @@ public: } } - void AddIntoPortionBeforeBlob(const TBlobRangeLink16& bRange, TPortionInfo& portionInfo) const { + void AddIntoPortionBeforeBlob(const TBlobRangeLink16& bRange, TPortionInfoConstructor& portionInfo) const { AFL_VERIFY(!bRange.IsValid()); return DoAddIntoPortionBeforeBlob(bRange, portionInfo); } diff --git a/ydb/core/tx/columnshard/splitter/chunks.cpp b/ydb/core/tx/columnshard/splitter/chunks.cpp index c00c3f43e61..8ebbd736c12 100644 --- a/ydb/core/tx/columnshard/splitter/chunks.cpp +++ b/ydb/core/tx/columnshard/splitter/chunks.cpp @@ -1,6 +1,7 @@ #include "chunks.h" #include <ydb/core/tx/columnshard/engines/portions/column_record.h> #include <ydb/core/tx/columnshard/engines/portions/portion_info.h> +#include <ydb/core/tx/columnshard/engines/portions/constructor.h> namespace NKikimr::NOlap { @@ -24,7 +25,7 @@ std::vector<std::shared_ptr<IPortionDataChunk>> IPortionColumnChunk::DoInternalS return result; } -void IPortionColumnChunk::DoAddIntoPortionBeforeBlob(const TBlobRangeLink16& bRange, TPortionInfo& portionInfo) const { +void IPortionColumnChunk::DoAddIntoPortionBeforeBlob(const TBlobRangeLink16& bRange, TPortionInfoConstructor& portionInfo) const { AFL_VERIFY(!bRange.IsValid()); TColumnRecord rec(GetChunkAddressVerified(), bRange, BuildSimpleChunkMeta()); portionInfo.AppendOneChunkColumn(std::move(rec)); diff --git a/ydb/core/tx/columnshard/splitter/chunks.h b/ydb/core/tx/columnshard/splitter/chunks.h index bd7d6c80efd..eae2f4d58fd 100644 --- a/ydb/core/tx/columnshard/splitter/chunks.h +++ b/ydb/core/tx/columnshard/splitter/chunks.h @@ -19,7 +19,7 @@ protected: return DoGetRecordsCountImpl(); } - virtual void DoAddIntoPortionBeforeBlob(const TBlobRangeLink16& bRange, TPortionInfo& portionInfo) const override; + virtual void DoAddIntoPortionBeforeBlob(const TBlobRangeLink16& bRange, TPortionInfoConstructor& portionInfo) const override; virtual std::vector<std::shared_ptr<IPortionDataChunk>> DoInternalSplitImpl(const TColumnSaver& saver, const std::shared_ptr<NColumnShard::TSplitterCounters>& counters, const std::vector<ui64>& splitSizes) const = 0; virtual std::vector<std::shared_ptr<IPortionDataChunk>> DoInternalSplit(const TColumnSaver& saver, const std::shared_ptr<NColumnShard::TSplitterCounters>& counters, const std::vector<ui64>& splitSizes) const override; diff --git a/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp b/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp index e084c3386f5..65277b9f151 100644 --- a/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp +++ b/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp @@ -2513,9 +2513,9 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) { Y_ABORT_UNLESS(append->AppendedPortions.size()); Cerr << "Added portions:"; for (const auto& portion : append->AppendedPortions) { + Y_UNUSED(portion); ++addedPortions; - ui64 portionId = addedPortions; - Cerr << " " << portionId << "(" << portion.GetPortionInfo().GetPortion() << ")"; + Cerr << " " << addedPortions; } Cerr << Endl; } |
