diff options
author | ivanmorozov333 <ivanmorozov@ydb.tech> | 2024-10-31 10:09:01 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-10-31 10:09:01 +0300 |
commit | 21fa904d5864271db615aa2ea5fa53419beb32cb (patch) | |
tree | 8a0a98a1b28207190212f22d0ccf5ff681d39d1a | |
parent | b9d09c35e7ac68cc901fd377249a1d3e091b60a5 (diff) | |
download | ydb-21fa904d5864271db615aa2ea5fa53419beb32cb.tar.gz |
actualize local db for columnshards (#11115)
33 files changed, 703 insertions, 282 deletions
diff --git a/.github/config/muted_ya.txt b/.github/config/muted_ya.txt index 3d8ceb6df33..5ae37139dfe 100644 --- a/.github/config/muted_ya.txt +++ b/.github/config/muted_ya.txt @@ -45,7 +45,6 @@ ydb/core/kqp/ut/sysview KqpSystemView.PartitionStatsFollower ydb/core/mind/hive/ut THiveTest.DrainWithHiveRestart ydb/core/persqueue/ut [*/*] chunk chunk ydb/core/quoter/ut QuoterWithKesusTest.PrefetchCoefficient -ydb/core/tx/columnshard/ut_rw Normalizers.CleanEmptyPortionsNormalizer ydb/core/tx/schemeshard/ut_move_reboots TSchemeShardMoveRebootsTest.WithData ydb/core/tx/schemeshard/ut_move_reboots TSchemeShardMoveRebootsTest.WithDataAndPersistentPartitionStats ydb/core/tx/schemeshard/ut_pq_reboots TPqGroupTestReboots.AlterWithReboots-PQConfigTransactionsAtSchemeShard-false diff --git a/ydb/core/tx/columnshard/columnshard_schema.h b/ydb/core/tx/columnshard/columnshard_schema.h index 37307761375..4c2e9e9c9fb 100644 --- a/ydb/core/tx/columnshard/columnshard_schema.h +++ b/ydb/core/tx/columnshard/columnshard_schema.h @@ -901,27 +901,55 @@ struct Schema : NIceDb::Schema { } namespace NKikimr::NOlap { +class TPortionLoadContext { +private: + YDB_READONLY(ui64, PathId, 0); + YDB_READONLY(ui64, PortionId, 0); + YDB_READONLY_DEF(NKikimrTxColumnShard::TIndexPortionMeta, MetaProto); + +public: + template <class TSource> + TPortionLoadContext(const TSource& rowset) { + PathId = rowset.template GetValue<NColumnShard::Schema::IndexPortions::PathId>(); + PortionId = rowset.template GetValue<NColumnShard::Schema::IndexPortions::PortionId>(); + const TString metadata = rowset.template GetValue<NColumnShard::Schema::IndexPortions::Metadata>(); + AFL_VERIFY(MetaProto.ParseFromArray(metadata.data(), metadata.size()))("event", "cannot parse metadata as protobuf"); + } +}; + class TColumnChunkLoadContext { private: YDB_READONLY_DEF(TBlobRange, BlobRange); TChunkAddress Address; + YDB_READONLY(ui64, PathId, 0); + YDB_READONLY(ui64, PortionId, 0); YDB_READONLY_DEF(NKikimrTxColumnShard::TIndexColumnMeta, MetaProto); + YDB_READONLY(TSnapshot, RemoveSnapshot, TSnapshot::Zero()); + YDB_READONLY(TSnapshot, MinSnapshotDeprecated, TSnapshot::Zero()); + public: const TChunkAddress& GetAddress() const { return Address; } - TColumnChunkLoadContext(const TChunkAddress& address, const TBlobRange& bRange, const NKikimrTxColumnShard::TIndexColumnMeta& metaProto) + TColumnChunkLoadContext(const ui64 pathId, const ui64 portionId, const TChunkAddress& address, const TBlobRange& bRange, + const NKikimrTxColumnShard::TIndexColumnMeta& metaProto) : BlobRange(bRange) , Address(address) - , MetaProto(metaProto) - { - + , PathId(pathId) + , PortionId(portionId) + , MetaProto(metaProto) { } template <class TSource> TColumnChunkLoadContext(const TSource& rowset, const IBlobGroupSelector* dsGroupSelector) - : Address(rowset.template GetValue<NColumnShard::Schema::IndexColumns::ColumnIdx>(), rowset.template GetValue<NColumnShard::Schema::IndexColumns::Chunk>()) { + : Address(rowset.template GetValue<NColumnShard::Schema::IndexColumns::ColumnIdx>(), + rowset.template GetValue<NColumnShard::Schema::IndexColumns::Chunk>()) + , RemoveSnapshot(rowset.template GetValue<NColumnShard::Schema::IndexColumns::XPlanStep>(), + rowset.template GetValue<NColumnShard::Schema::IndexColumns::XTxId>()) + , MinSnapshotDeprecated(rowset.template GetValue<NColumnShard::Schema::IndexColumns::PlanStep>(), + rowset.template GetValue<NColumnShard::Schema::IndexColumns::TxId>()) + { AFL_VERIFY(Address.GetColumnId())("event", "incorrect address")("address", Address.DebugString()); TString strBlobId = rowset.template GetValue<NColumnShard::Schema::IndexColumns::Blob>(); Y_ABORT_UNLESS(strBlobId.size() == sizeof(TLogoBlobID), "Size %" PRISZT " doesn't match TLogoBlobID", strBlobId.size()); @@ -929,29 +957,38 @@ public: BlobRange.BlobId = NOlap::TUnifiedBlobId(dsGroupSelector->GetGroup(logoBlobId), logoBlobId); BlobRange.Offset = rowset.template GetValue<NColumnShard::Schema::IndexColumns::Offset>(); BlobRange.Size = rowset.template GetValue<NColumnShard::Schema::IndexColumns::Size>(); + PathId = rowset.template GetValue<NColumnShard::Schema::IndexColumns::PathId>(); + PortionId = rowset.template GetValue<NColumnShard::Schema::IndexColumns::Portion>(); AFL_VERIFY(BlobRange.BlobId.IsValid() && BlobRange.Size)("event", "incorrect blob")("blob", BlobRange.ToString()); const TString metadata = rowset.template GetValue<NColumnShard::Schema::IndexColumns::Metadata>(); AFL_VERIFY(MetaProto.ParseFromArray(metadata.data(), metadata.size()))("event", "cannot parse metadata as protobuf"); } - - const NKikimrTxColumnShard::TIndexPortionMeta* GetPortionMeta() const { - if (MetaProto.HasPortionMeta()) { - return &MetaProto.GetPortionMeta(); - } else { - return nullptr; - } - } }; class TIndexChunkLoadContext { private: YDB_READONLY_DEF(std::optional<TBlobRange>, BlobRange); YDB_READONLY_DEF(std::optional<TString>, BlobData); + YDB_READONLY(ui64, PathId, 0); + YDB_READONLY(ui64, PortionId, 0); TChunkAddress Address; const ui32 RecordsCount; const ui32 RawBytes; public: + ui32 GetRawBytes() const { + return RawBytes; + } + + ui32 GetDataSize() const { + if (BlobRange) { + return BlobRange->GetSize(); + } else { + AFL_VERIFY(!!BlobData); + return BlobData->size(); + } + } + TIndexChunk BuildIndexChunk(const TBlobRangeLink16::TLinkId blobLinkId) const { AFL_VERIFY(BlobRange); return TIndexChunk(Address.GetColumnId(), Address.GetChunkIdx(), RecordsCount, RawBytes, BlobRange->BuildLink(blobLinkId)); @@ -964,7 +1001,9 @@ public: template <class TSource> TIndexChunkLoadContext(const TSource& rowset, const IBlobGroupSelector* dsGroupSelector) - : Address(rowset.template GetValue<NColumnShard::Schema::IndexIndexes::IndexId>(), rowset.template GetValue<NColumnShard::Schema::IndexIndexes::ChunkIdx>()) + : PathId(rowset.template GetValue<NColumnShard::Schema::IndexIndexes::PathId>()) + , PortionId(rowset.template GetValue<NColumnShard::Schema::IndexIndexes::PortionId>()) + , Address(rowset.template GetValue<NColumnShard::Schema::IndexIndexes::IndexId>(), rowset.template GetValue<NColumnShard::Schema::IndexIndexes::ChunkIdx>()) , RecordsCount(rowset.template GetValue<NColumnShard::Schema::IndexIndexes::RecordsCount>()) , RawBytes(rowset.template GetValue<NColumnShard::Schema::IndexIndexes::RawBytes>()) { diff --git a/ydb/core/tx/columnshard/common/blob.h b/ydb/core/tx/columnshard/common/blob.h index e1c10a46d40..b7e3f0fa5a7 100644 --- a/ydb/core/tx/columnshard/common/blob.h +++ b/ydb/core/tx/columnshard/common/blob.h @@ -189,6 +189,14 @@ struct TBlobRange { ui32 Offset; ui32 Size; + ui32 GetSize() const { + return Size; + } + + ui32 GetOffset() const { + return Offset; + } + TString GetData(const TString& blobData) const; bool operator<(const TBlobRange& br) const { diff --git a/ydb/core/tx/columnshard/data_sharing/destination/session/destination.cpp b/ydb/core/tx/columnshard/data_sharing/destination/session/destination.cpp index f5c35e58943..cb960b38c49 100644 --- a/ydb/core/tx/columnshard/data_sharing/destination/session/destination.cpp +++ b/ydb/core/tx/columnshard/data_sharing/destination/session/destination.cpp @@ -20,7 +20,7 @@ NKikimr::TConclusionStatus TDestinationSession::DataReceived( AFL_VERIFY(it != PathIds.end())("path_id_undefined", i.first); for (auto&& portion : i.second.DetachPortions()) { portion.MutablePortionInfo().SetPathId(it->second); - index.AppendPortion(portion.GetPortionInfo()); + index.AppendPortion(portion.MutablePortionInfoPtr()); } } return TConclusionStatus::Success(); diff --git a/ydb/core/tx/columnshard/engines/changes/with_appended.cpp b/ydb/core/tx/columnshard/engines/changes/with_appended.cpp index bb99f3d4c68..46c39964da5 100644 --- a/ydb/core/tx/columnshard/engines/changes/with_appended.cpp +++ b/ydb/core/tx/columnshard/engines/changes/with_appended.cpp @@ -111,7 +111,7 @@ void TChangesWithAppend::DoWriteIndexOnComplete(NColumnShard::TColumnShard* self context.EngineLogs.AddCleanupPortion(i); } for (auto& portionBuilder : AppendedPortions) { - context.EngineLogs.AppendPortion(portionBuilder.GetPortionResult().GetPortionInfo()); + context.EngineLogs.AppendPortion(portionBuilder.GetPortionResult().MutablePortionInfoPtr()); } } } diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp index e4f383e765b..dc1665cf615 100644 --- a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp +++ b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp @@ -2,25 +2,25 @@ #include "filter.h" #include "changes/actualization/construction/context.h" -#include "changes/indexation.h" -#include "changes/general_compaction.h" #include "changes/cleanup_portions.h" #include "changes/cleanup_tables.h" +#include "changes/general_compaction.h" +#include "changes/indexation.h" #include "changes/ttl.h" #include "portions/constructor.h" #include <ydb/core/base/appdata.h> -#include <ydb/core/tx/columnshard/common/limits.h> -#include <ydb/core/tx/columnshard/hooks/abstract/abstract.h> -#include <ydb/core/tx/columnshard/columnshard_ttl.h> #include <ydb/core/tx/columnshard/columnshard_schema.h> +#include <ydb/core/tx/columnshard/columnshard_ttl.h> +#include <ydb/core/tx/columnshard/common/limits.h> #include <ydb/core/tx/columnshard/data_locks/manager/manager.h> +#include <ydb/core/tx/columnshard/hooks/abstract/abstract.h> #include <ydb/core/tx/tiering/manager.h> +#include <ydb/library/actors/core/monotonic_provider.h> #include <ydb/library/conclusion/status.h> #include <library/cpp/time_provider/time_provider.h> -#include <ydb/library/actors/core/monotonic_provider.h> #include <concepts> @@ -32,14 +32,13 @@ TColumnEngineForLogs::TColumnEngineForLogs( , StoragesManager(storagesManager) , TabletId(tabletId) , LastPortion(0) - , LastGranule(0) -{ + , LastGranule(0) { ActualizationController = std::make_shared<NActualizer::TController>(); RegisterSchemaVersion(snapshot, schema); } -TColumnEngineForLogs::TColumnEngineForLogs(ui64 tabletId, const std::shared_ptr<IStoragesManager>& storagesManager, - const TSnapshot& snapshot, TIndexInfo&& schema) +TColumnEngineForLogs::TColumnEngineForLogs( + ui64 tabletId, const std::shared_ptr<IStoragesManager>& storagesManager, const TSnapshot& snapshot, TIndexInfo&& schema) : GranulesStorage(std::make_shared<TGranulesStorage>(SignalCounters, storagesManager)) , StoragesManager(storagesManager) , TabletId(tabletId) @@ -58,13 +57,14 @@ const TColumnEngineStats& TColumnEngineForLogs::GetTotalStats() { return Counters; } -void TColumnEngineForLogs::UpdatePortionStats(const TPortionInfo& portionInfo, EStatsUpdateType updateType, - const TPortionInfo* exPortionInfo) { +void TColumnEngineForLogs::UpdatePortionStats(const TPortionInfo& portionInfo, EStatsUpdateType updateType, const TPortionInfo* exPortionInfo) { if (IS_LOG_PRIORITY_ENABLED(NActors::NLog::PRI_DEBUG, NKikimrServices::TX_COLUMNSHARD)) { auto before = Counters.Active(); UpdatePortionStats(Counters, portionInfo, updateType, exPortionInfo); auto after = Counters.Active(); - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "portion_stats_updated")("type", updateType)("path_id", portionInfo.GetPathId())("portion", portionInfo.GetPortionId())("before_size", before.Bytes)("after_size", after.Bytes)("before_rows", before.Rows)("after_rows", after.Rows); + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "portion_stats_updated")("type", updateType)("path_id", portionInfo.GetPathId())( + "portion", portionInfo.GetPortionId())("before_size", before.Bytes)("after_size", after.Bytes)("before_rows", before.Rows)( + "after_rows", after.Rows); } else { UpdatePortionStats(Counters, portionInfo, updateType, exPortionInfo); } @@ -89,31 +89,28 @@ TColumnEngineStats::TPortionsStats DeltaStats(const TPortionInfo& portionInfo) { return deltaStats; } -void TColumnEngineForLogs::UpdatePortionStats(TColumnEngineStats& engineStats, const TPortionInfo& portionInfo, - EStatsUpdateType updateType, - const TPortionInfo* exPortionInfo) const { +void TColumnEngineForLogs::UpdatePortionStats( + TColumnEngineStats& engineStats, const TPortionInfo& portionInfo, EStatsUpdateType updateType, const TPortionInfo* exPortionInfo) const { TColumnEngineStats::TPortionsStats deltaStats = DeltaStats(portionInfo); Y_ABORT_UNLESS(!exPortionInfo || exPortionInfo->GetMeta().Produced != TPortionMeta::EProduced::UNSPECIFIED); Y_ABORT_UNLESS(portionInfo.GetMeta().Produced != TPortionMeta::EProduced::UNSPECIFIED); - TColumnEngineStats::TPortionsStats& srcStats = exPortionInfo - ? (exPortionInfo->HasRemoveSnapshot() - ? engineStats.StatsByType[TPortionMeta::EProduced::INACTIVE] - : engineStats.StatsByType[exPortionInfo->GetMeta().Produced]) - : engineStats.StatsByType[portionInfo.GetMeta().Produced]; - TColumnEngineStats::TPortionsStats& stats = portionInfo.HasRemoveSnapshot() - ? engineStats.StatsByType[TPortionMeta::EProduced::INACTIVE] - : engineStats.StatsByType[portionInfo.GetMeta().Produced]; + TColumnEngineStats::TPortionsStats& srcStats = + exPortionInfo ? (exPortionInfo->HasRemoveSnapshot() ? engineStats.StatsByType[TPortionMeta::EProduced::INACTIVE] + : engineStats.StatsByType[exPortionInfo->GetMeta().Produced]) + : engineStats.StatsByType[portionInfo.GetMeta().Produced]; + TColumnEngineStats::TPortionsStats& stats = portionInfo.HasRemoveSnapshot() ? engineStats.StatsByType[TPortionMeta::EProduced::INACTIVE] + : engineStats.StatsByType[portionInfo.GetMeta().Produced]; const bool isErase = updateType == EStatsUpdateType::ERASE; const bool isAdd = updateType == EStatsUpdateType::ADD; - if (isErase) { // PortionsToDrop + if (isErase) { // PortionsToDrop stats -= deltaStats; - } else if (isAdd) { // Load || AppendedPortions + } else if (isAdd) { // Load || AppendedPortions stats += deltaStats; - } else if (&srcStats != &stats || exPortionInfo) { // SwitchedPortions || PortionsToEvict + } else if (&srcStats != &stats || exPortionInfo) { // SwitchedPortions || PortionsToEvict stats += deltaStats; if (exPortionInfo) { @@ -206,10 +203,10 @@ bool TColumnEngineForLogs::LoadColumns(IDbWrapper& db) { NColumnShard::TLoadTimeSignals::TLoadTimer timer = SignalCounters.PortionsLoadingTimeCounters.StartGuard(); TMemoryProfileGuard g("TTxInit/LoadColumns/Portions"); if (!db.LoadPortions([&](TPortionInfoConstructor&& portion, const NKikimrTxColumnShard::TIndexPortionMeta& metaProto) { - const TIndexInfo& indexInfo = portion.GetSchema(VersionedIndex)->GetIndexInfo(); - AFL_VERIFY(portion.MutableMeta().LoadMetadata(metaProto, indexInfo)); - AFL_VERIFY(constructors.AddConstructorVerified(std::move(portion))); - })) { + const TIndexInfo& indexInfo = portion.GetSchema(VersionedIndex)->GetIndexInfo(); + AFL_VERIFY(portion.MutableMeta().LoadMetadata(metaProto, indexInfo)); + AFL_VERIFY(constructors.AddConstructorVerified(std::move(portion))); + })) { timer.AddLoadingFail(); return false; } @@ -219,11 +216,10 @@ bool TColumnEngineForLogs::LoadColumns(IDbWrapper& db) { NColumnShard::TLoadTimeSignals::TLoadTimer timer = SignalCounters.ColumnsLoadingTimeCounters.StartGuard(); TMemoryProfileGuard g("TTxInit/LoadColumns/Records"); TPortionInfo::TSchemaCursor schema(VersionedIndex); - if (!db.LoadColumns([&](TPortionInfoConstructor&& portion, const TColumnChunkLoadContext& loadContext) { - auto currentSchema = schema.GetSchema(portion); - auto* constructor = constructors.MergeConstructor(std::move(portion)); - constructor->LoadRecord(currentSchema->GetIndexInfo(), loadContext); - })) { + if (!db.LoadColumns([&](const TColumnChunkLoadContext& loadContext) { + auto* constructor = constructors.GetConstructorVerified(loadContext.GetPathId(), loadContext.GetPortionId()); + constructor->LoadRecord(loadContext); + })) { timer.AddLoadingFail(); return false; } @@ -231,11 +227,11 @@ bool TColumnEngineForLogs::LoadColumns(IDbWrapper& db) { { NColumnShard::TLoadTimeSignals::TLoadTimeSignals::TLoadTimer timer = SignalCounters.IndexesLoadingTimeCounters.StartGuard(); - TMemoryProfileGuard g("TTxInit/LoadColumns/Indexes"); + TMemoryProfileGuard g("TTxInit/LoadIndexes/Indexes"); if (!db.LoadIndexes([&](const ui64 pathId, const ui64 portionId, const TIndexChunkLoadContext& loadContext) { - auto* constructor = constructors.GetConstructorVerified(pathId, portionId); - constructor->LoadIndex(loadContext); - })) { + auto* constructor = constructors.GetConstructorVerified(pathId, portionId); + constructor->LoadIndex(loadContext); + })) { timer.AddLoadingFail(); return false; }; @@ -246,8 +242,7 @@ bool TColumnEngineForLogs::LoadColumns(IDbWrapper& db) { for (auto&& [granuleId, pathConstructors] : constructors) { auto g = GetGranulePtrVerified(granuleId); for (auto&& [portionId, constructor] : pathConstructors) { - auto portion = *constructor.Build(false).GetPortionInfoPtr(); - g->UpsertPortionOnLoad(std::move(portion)); + g->UpsertPortionOnLoad(constructor.Build(false).MutablePortionInfoPtr()); } } } @@ -263,18 +258,18 @@ bool TColumnEngineForLogs::LoadColumns(IDbWrapper& db) { bool TColumnEngineForLogs::LoadCounters(IDbWrapper& db) { auto callback = [&](ui32 id, ui64 value) { switch (id) { - case LAST_PORTION: - LastPortion = value; - break; - case LAST_GRANULE: - LastGranule = value; - break; - case LAST_PLAN_STEP: - LastSnapshot = TSnapshot(value, LastSnapshot.GetTxId()); - break; - case LAST_TX_ID: - LastSnapshot = TSnapshot(LastSnapshot.GetPlanStep(), value); - break; + case LAST_PORTION: + LastPortion = value; + break; + case LAST_GRANULE: + LastGranule = value; + break; + case LAST_PLAN_STEP: + LastSnapshot = TSnapshot(value, LastSnapshot.GetTxId()); + break; + case LAST_TX_ID: + LastSnapshot = TSnapshot(LastSnapshot.GetPlanStep(), value); + break; } }; @@ -297,7 +292,6 @@ std::shared_ptr<TInsertColumnEngineChanges> TColumnEngineForLogs::StartInsert(st if (!data.GetRemove()) { AFL_VERIFY(changes->PathToGranule.emplace(pathId, GetGranulePtrVerified(pathId)->GetBucketPositions()).second); } - } return changes; @@ -313,7 +307,8 @@ ui64 TColumnEngineForLogs::GetCompactionPriority(const std::shared_ptr<NDataLock } } -std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartCompaction(const std::shared_ptr<NDataLocks::TManager>& dataLocksManager) noexcept { +std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartCompaction( + const std::shared_ptr<NDataLocks::TManager>& dataLocksManager) noexcept { AFL_VERIFY(dataLocksManager); auto granule = GranulesStorage->GetGranuleForCompaction(dataLocksManager); if (!granule) { @@ -323,7 +318,8 @@ std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartCompaction(cons granule->OnStartCompaction(); auto changes = granule->GetOptimizationTask(granule, dataLocksManager); if (!changes) { - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "cannot build optimization task for granule that need compaction")("weight", granule->GetCompactionPriority().DebugString()); + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "cannot build optimization task for granule that need compaction")( + "weight", granule->GetCompactionPriority().DebugString()); } return changes; } @@ -351,8 +347,8 @@ std::shared_ptr<TCleanupTablesColumnEngineChanges> TColumnEngineForLogs::StartCl return changes; } -std::shared_ptr<TCleanupPortionsColumnEngineChanges> TColumnEngineForLogs::StartCleanupPortions(const TSnapshot& snapshot, - const THashSet<ui64>& pathsToDrop, const std::shared_ptr<NDataLocks::TManager>& dataLocksManager) noexcept { +std::shared_ptr<TCleanupPortionsColumnEngineChanges> TColumnEngineForLogs::StartCleanupPortions( + const TSnapshot& snapshot, const THashSet<ui64>& pathsToDrop, const std::shared_ptr<NDataLocks::TManager>& dataLocksManager) noexcept { AFL_VERIFY(dataLocksManager); AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "StartCleanup")("portions_count", CleanupPortions.size()); auto changes = std::make_shared<TCleanupPortionsColumnEngineChanges>(StoragesManager); @@ -423,8 +419,8 @@ std::shared_ptr<TCleanupPortionsColumnEngineChanges> TColumnEngineForLogs::Start ++it; } } - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "StartCleanup") - ("portions_count", CleanupPortions.size())("portions_prepared", changes->PortionsToDrop.size())("drop", portionsFromDrop)("skip", skipLocked); + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "StartCleanup")("portions_count", CleanupPortions.size())( + "portions_prepared", changes->PortionsToDrop.size())("drop", portionsFromDrop)("skip", skipLocked); if (changes->PortionsToDrop.empty()) { return nullptr; @@ -433,8 +429,8 @@ std::shared_ptr<TCleanupPortionsColumnEngineChanges> TColumnEngineForLogs::Start return changes; } -std::vector<std::shared_ptr<TTTLColumnEngineChanges>> TColumnEngineForLogs::StartTtl(const THashMap<ui64, TTiering>& pathEviction, const std::shared_ptr<NDataLocks::TManager>& dataLocksManager, - const ui64 memoryUsageLimit) noexcept { +std::vector<std::shared_ptr<TTTLColumnEngineChanges>> TColumnEngineForLogs::StartTtl(const THashMap<ui64, TTiering>& pathEviction, + const std::shared_ptr<NDataLocks::TManager>& dataLocksManager, const ui64 memoryUsageLimit) noexcept { AFL_VERIFY(dataLocksManager); AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "StartTtl")("external", pathEviction.size()); @@ -480,7 +476,8 @@ bool TColumnEngineForLogs::ApplyChangesOnTxCreate(std::shared_ptr<TColumnEngineC return true; } -bool TColumnEngineForLogs::ApplyChangesOnExecute(IDbWrapper& db, std::shared_ptr<TColumnEngineChanges> /*indexChanges*/, const TSnapshot& snapshot) noexcept { +bool TColumnEngineForLogs::ApplyChangesOnExecute( + IDbWrapper& db, std::shared_ptr<TColumnEngineChanges> /*indexChanges*/, const TSnapshot& snapshot) noexcept { db.WriteCounter(LAST_PORTION, LastPortion); db.WriteCounter(LAST_GRANULE, LastGranule); @@ -492,11 +489,11 @@ bool TColumnEngineForLogs::ApplyChangesOnExecute(IDbWrapper& db, std::shared_ptr return true; } -void TColumnEngineForLogs::AppendPortion(const TPortionInfo& portionInfo) { - auto granule = GetGranulePtrVerified(portionInfo.GetPathId()); - AFL_VERIFY(!granule->GetPortionOptional(portionInfo.GetPortionId())); - UpdatePortionStats(portionInfo, EStatsUpdateType::ADD); - granule->UpsertPortion(portionInfo); +void TColumnEngineForLogs::AppendPortion(const TPortionInfo::TPtr& portionInfo) { + auto granule = GetGranulePtrVerified(portionInfo->GetPathId()); + AFL_VERIFY(!granule->GetPortionOptional(portionInfo->GetPortionId())); + UpdatePortionStats(*portionInfo, EStatsUpdateType::ADD); + granule->AppendPortion(portionInfo); } bool TColumnEngineForLogs::ErasePortion(const TPortionInfo& portionInfo, bool updateStats) { @@ -553,7 +550,8 @@ std::shared_ptr<TSelectInfo> TColumnEngineForLogs::Select( return out; } -void TColumnEngineForLogs::OnTieringModified(const std::shared_ptr<NColumnShard::TTiersManager>& manager, const NColumnShard::TTtl& ttl, const std::optional<ui64> pathId) { +void TColumnEngineForLogs::OnTieringModified( + const std::shared_ptr<NColumnShard::TTiersManager>& manager, const NColumnShard::TTtl& ttl, const std::optional<ui64> pathId) { if (!ActualizationStarted) { for (auto&& i : GranulesStorage->GetTables()) { i.second->StartActualizationIndex(); @@ -565,12 +563,10 @@ void TColumnEngineForLogs::OnTieringModified(const std::shared_ptr<NColumnShard: THashMap<ui64, TTiering> tierings = manager->GetTiering(); ttl.AddTtls(tierings); - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "OnTieringModified") - ("new_count_tierings", tierings.size()) - ("new_count_ttls", ttl.PathsCount()); + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "OnTieringModified")("new_count_tierings", tierings.size())( + "new_count_ttls", ttl.PathsCount()); // some string - if (pathId) { auto g = GetGranulePtrVerified(*pathId); auto it = tierings.find(*pathId); @@ -599,4 +595,4 @@ void TColumnEngineForLogs::DoRegisterTable(const ui64 pathId) { } } -} // namespace NKikimr::NOlap +} // namespace NKikimr::NOlap diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.h b/ydb/core/tx/columnshard/engines/column_engine_logs.h index 6173bd43834..aa0f41a7cde 100644 --- a/ydb/core/tx/columnshard/engines/column_engine_logs.h +++ b/ydb/core/tx/columnshard/engines/column_engine_logs.h @@ -192,14 +192,14 @@ public: template <class TModifier> void ModifyPortionOnComplete(const TPortionInfo::TConstPtr& portion, const TModifier& modifier) { - auto exPortion = *portion; + auto exPortion = portion->MakeCopy(); AFL_VERIFY(portion); auto granule = GetGranulePtrVerified(portion->GetPathId()); granule->ModifyPortionOnComplete(portion, modifier); UpdatePortionStats(*portion, EStatsUpdateType::DEFAULT, &exPortion); } - void AppendPortion(const TPortionInfo& portionInfo); + void AppendPortion(const TPortionInfo::TPtr& portionInfo); private: TVersionedIndex VersionedIndex; diff --git a/ydb/core/tx/columnshard/engines/db_wrapper.cpp b/ydb/core/tx/columnshard/engines/db_wrapper.cpp index 11ad657cd6b..fc63341d9d4 100644 --- a/ydb/core/tx/columnshard/engines/db_wrapper.cpp +++ b/ydb/core/tx/columnshard/engines/db_wrapper.cpp @@ -98,7 +98,7 @@ void TDbWrapper::EraseColumn(const NOlap::TPortionInfo& portion, const TColumnRe portion.GetMinSnapshotDeprecated().GetPlanStep(), portion.GetMinSnapshotDeprecated().GetTxId(), portion.GetPortionId(), row.Chunk).Delete(); } -bool TDbWrapper::LoadColumns(const std::function<void(NOlap::TPortionInfoConstructor&&, const TColumnChunkLoadContext&)>& callback) { +bool TDbWrapper::LoadColumns(const std::function<void(const TColumnChunkLoadContext&)>& callback) { NIceDb::TNiceDb db(Database); using IndexColumns = NColumnShard::Schema::IndexColumns; auto rowset = db.Table<IndexColumns>().Prefix(0).Select(); @@ -107,15 +107,8 @@ bool TDbWrapper::LoadColumns(const std::function<void(NOlap::TPortionInfoConstru } while (!rowset.EndOfSet()) { - NOlap::TSnapshot minSnapshot(rowset.GetValue<IndexColumns::PlanStep>(), rowset.GetValue<IndexColumns::TxId>()); - NOlap::TSnapshot removeSnapshot(rowset.GetValue<IndexColumns::XPlanStep>(), rowset.GetValue<IndexColumns::XTxId>()); - - NOlap::TPortionInfoConstructor constructor(rowset.GetValue<IndexColumns::PathId>(), rowset.GetValue<IndexColumns::Portion>()); - constructor.SetMinSnapshotDeprecated(minSnapshot); - constructor.SetRemoveSnapshot(removeSnapshot); - NOlap::TColumnChunkLoadContext chunkLoadContext(rowset, DsGroupSelector); - callback(std::move(constructor), chunkLoadContext); + callback(chunkLoadContext); if (!rowset.Next()) { return false; diff --git a/ydb/core/tx/columnshard/engines/db_wrapper.h b/ydb/core/tx/columnshard/engines/db_wrapper.h index 50958b6fca2..303dc75efd2 100644 --- a/ydb/core/tx/columnshard/engines/db_wrapper.h +++ b/ydb/core/tx/columnshard/engines/db_wrapper.h @@ -41,7 +41,7 @@ public: virtual void WriteColumn(const TPortionInfo& portion, const TColumnRecord& row, const ui32 firstPKColumnId) = 0; virtual void EraseColumn(const TPortionInfo& portion, const TColumnRecord& row) = 0; - virtual bool LoadColumns(const std::function<void(NOlap::TPortionInfoConstructor&&, const TColumnChunkLoadContext&)>& callback) = 0; + virtual bool LoadColumns(const std::function<void(const TColumnChunkLoadContext&)>& callback) = 0; virtual void WritePortion(const NOlap::TPortionInfo& portion) = 0; virtual void ErasePortion(const NOlap::TPortionInfo& portion) = 0; @@ -78,7 +78,7 @@ public: void WriteColumn(const NOlap::TPortionInfo& portion, const TColumnRecord& row, const ui32 firstPKColumnId) override; void EraseColumn(const NOlap::TPortionInfo& portion, const TColumnRecord& row) override; - bool LoadColumns(const std::function<void(NOlap::TPortionInfoConstructor&&, const TColumnChunkLoadContext&)>& callback) override; + bool LoadColumns(const std::function<void(const TColumnChunkLoadContext&)>& callback) override; virtual void WriteIndex(const TPortionInfo& portion, const TIndexChunk& row) override; virtual void EraseIndex(const TPortionInfo& portion, const TIndexChunk& row) override; diff --git a/ydb/core/tx/columnshard/engines/portions/constructor.cpp b/ydb/core/tx/columnshard/engines/portions/constructor.cpp index 273edd597c9..188a415a536 100644 --- a/ydb/core/tx/columnshard/engines/portions/constructor.cpp +++ b/ydb/core/tx/columnshard/engines/portions/constructor.cpp @@ -12,6 +12,22 @@ namespace NKikimr::NOlap { TPortionDataAccessor TPortionInfoConstructor::Build(const bool needChunksNormalization) { AFL_VERIFY(!Constructed); Constructed = true; + + MetaConstructor.ColumnRawBytes = 0; + MetaConstructor.ColumnBlobBytes = 0; + MetaConstructor.IndexRawBytes = 0; + MetaConstructor.IndexBlobBytes = 0; + + MetaConstructor.RecordsCount = GetRecordsCount(); + for (auto&& r : Records) { + *MetaConstructor.ColumnRawBytes += r.GetMeta().GetRawBytes(); + *MetaConstructor.ColumnBlobBytes += r.GetBlobRange().GetSize(); + } + for (auto&& r : Indexes) { + *MetaConstructor.IndexRawBytes += r.GetRawBytes(); + *MetaConstructor.IndexBlobBytes += r.GetDataSize(); + } + std::shared_ptr<TPortionInfo> result(new TPortionInfo(MetaConstructor.Build())); AFL_VERIFY(PathId); result->PathId = PathId; @@ -97,7 +113,6 @@ TPortionDataAccessor TPortionInfoConstructor::Build(const bool needChunksNormali result->Records.shrink_to_fit(); result->BlobIds = std::move(BlobIds); result->BlobIds.shrink_to_fit(); - result->Precalculate(); return TPortionDataAccessor(result); } @@ -111,13 +126,9 @@ ISnapshotSchema::TPtr TPortionInfoConstructor::GetSchema(const TVersionedIndex& return index.GetSchema(*MinSnapshotDeprecated); } -void TPortionInfoConstructor::LoadRecord(const TIndexInfo& indexInfo, const TColumnChunkLoadContext& loadContext) { +void TPortionInfoConstructor::LoadRecord(const TColumnChunkLoadContext& loadContext) { TColumnRecord rec(RegisterBlobId(loadContext.GetBlobRange().GetBlobId()), loadContext); Records.push_back(std::move(rec)); - - if (loadContext.GetPortionMeta()) { - AFL_VERIFY(MetaConstructor.LoadMetadata(*loadContext.GetPortionMeta(), indexInfo)); - } } void TPortionInfoConstructor::LoadIndex(const TIndexChunkLoadContext& loadContext) { diff --git a/ydb/core/tx/columnshard/engines/portions/constructor.h b/ydb/core/tx/columnshard/engines/portions/constructor.h index e42e92c190d..90e44218c1d 100644 --- a/ydb/core/tx/columnshard/engines/portions/constructor.h +++ b/ydb/core/tx/columnshard/engines/portions/constructor.h @@ -276,9 +276,10 @@ public: SetRemoveSnapshot(TSnapshot(planStep, txId)); } - void LoadRecord(const TIndexInfo& indexInfo, const TColumnChunkLoadContext& loadContext); + void LoadRecord(const TColumnChunkLoadContext& loadContext); ui32 GetRecordsCount() const { + AFL_VERIFY(Records.size()); ui32 result = 0; std::optional<ui32> columnIdFirst; for (auto&& i : Records) { diff --git a/ydb/core/tx/columnshard/engines/portions/constructor_meta.cpp b/ydb/core/tx/columnshard/engines/portions/constructor_meta.cpp index 948a199c960..dbb22213e48 100644 --- a/ydb/core/tx/columnshard/engines/portions/constructor_meta.cpp +++ b/ydb/core/tx/columnshard/engines/portions/constructor_meta.cpp @@ -42,16 +42,20 @@ TPortionMeta TPortionMetaConstructor::Build() { AFL_VERIFY(FirstAndLastPK); AFL_VERIFY(RecordSnapshotMin); AFL_VERIFY(RecordSnapshotMax); - AFL_VERIFY(CompactionLevel); TPortionMeta result(*FirstAndLastPK, *RecordSnapshotMin, *RecordSnapshotMax); if (TierName) { result.TierName = *TierName; } - result.CompactionLevel = *CompactionLevel; - AFL_VERIFY(DeletionsCount); - result.DeletionsCount = *DeletionsCount; - AFL_VERIFY(Produced); - result.Produced = *Produced; + result.CompactionLevel = *TValidator::CheckNotNull(CompactionLevel); + result.DeletionsCount = *TValidator::CheckNotNull(DeletionsCount); + result.Produced = *TValidator::CheckNotNull(Produced); + + result.RecordsCount = *TValidator::CheckNotNull(RecordsCount); + result.ColumnRawBytes = *TValidator::CheckNotNull(ColumnRawBytes); + result.ColumnBlobBytes = *TValidator::CheckNotNull(ColumnBlobBytes); + result.IndexRawBytes = *TValidator::CheckNotNull(IndexRawBytes); + result.IndexBlobBytes = *TValidator::CheckNotNull(IndexBlobBytes); + return result; } @@ -69,6 +73,11 @@ bool TPortionMetaConstructor::LoadMetadata(const NKikimrTxColumnShard::TIndexPor DeletionsCount = 0; } CompactionLevel = portionMeta.GetCompactionLevel(); + RecordsCount = TValidator::CheckNotNull(portionMeta.GetRecordsCount()); + ColumnRawBytes = TValidator::CheckNotNull(portionMeta.GetColumnRawBytes()); + ColumnBlobBytes = TValidator::CheckNotNull(portionMeta.GetColumnBlobBytes()); + IndexRawBytes = portionMeta.GetIndexRawBytes(); + IndexBlobBytes = portionMeta.GetIndexBlobBytes(); if (portionMeta.GetIsInserted()) { Produced = TPortionMeta::EProduced::INSERTED; } else if (portionMeta.GetIsCompacted()) { diff --git a/ydb/core/tx/columnshard/engines/portions/constructor_meta.h b/ydb/core/tx/columnshard/engines/portions/constructor_meta.h index 4c55771fdf4..2e36e389f6e 100644 --- a/ydb/core/tx/columnshard/engines/portions/constructor_meta.h +++ b/ydb/core/tx/columnshard/engines/portions/constructor_meta.h @@ -16,6 +16,13 @@ private: std::optional<TSnapshot> RecordSnapshotMax; std::optional<NPortion::EProduced> Produced; std::optional<ui64> CompactionLevel; + + std::optional<ui32> RecordsCount; + std::optional<ui64> ColumnRawBytes; + std::optional<ui32> ColumnBlobBytes; + std::optional<ui32> IndexRawBytes; + std::optional<ui32> IndexBlobBytes; + std::optional<ui32> DeletionsCount; friend class TPortionInfoConstructor; void FillMetaInfo(const NArrow::TFirstLastSpecialKeys& primaryKeys, const ui32 deletionsCount, const std::optional<NArrow::TMinMaxSpecialKeys>& snapshotKeys, const TIndexInfo& indexInfo); diff --git a/ydb/core/tx/columnshard/engines/portions/meta.cpp b/ydb/core/tx/columnshard/engines/portions/meta.cpp index 7019b577057..0c079d22cf2 100644 --- a/ydb/core/tx/columnshard/engines/portions/meta.cpp +++ b/ydb/core/tx/columnshard/engines/portions/meta.cpp @@ -13,6 +13,11 @@ NKikimrTxColumnShard::TIndexPortionMeta TPortionMeta::SerializeToProto() const { portionMeta.SetTierName(TierName); portionMeta.SetCompactionLevel(CompactionLevel); portionMeta.SetDeletionsCount(DeletionsCount); + portionMeta.SetRecordsCount(TValidator::CheckNotNull(RecordsCount)); + portionMeta.SetColumnRawBytes(TValidator::CheckNotNull(ColumnRawBytes)); + portionMeta.SetColumnBlobBytes(TValidator::CheckNotNull(ColumnBlobBytes)); + portionMeta.SetIndexRawBytes(IndexRawBytes); + portionMeta.SetIndexBlobBytes(IndexBlobBytes); switch (Produced) { case TPortionMeta::EProduced::UNSPECIFIED: Y_ABORT_UNLESS(false); diff --git a/ydb/core/tx/columnshard/engines/portions/meta.h b/ydb/core/tx/columnshard/engines/portions/meta.h index e45156b5d52..a7cc849f94d 100644 --- a/ydb/core/tx/columnshard/engines/portions/meta.h +++ b/ydb/core/tx/columnshard/engines/portions/meta.h @@ -17,6 +17,12 @@ private: YDB_READONLY_DEF(TString, TierName); YDB_READONLY(ui32, DeletionsCount, 0); YDB_READONLY(ui32, CompactionLevel, 0); + YDB_READONLY(ui32, RecordsCount, 0); + YDB_READONLY(ui64, ColumnRawBytes, 0); + YDB_READONLY(ui32, ColumnBlobBytes, 0); + YDB_READONLY(ui32, IndexRawBytes, 0); + YDB_READONLY(ui32, IndexBlobBytes, 0); + friend class TPortionMetaConstructor; friend class TPortionInfo; TPortionMeta(NArrow::TFirstLastSpecialKeys& pk, const TSnapshot& min, const TSnapshot& max) diff --git a/ydb/core/tx/columnshard/engines/portions/portion_info.cpp b/ydb/core/tx/columnshard/engines/portions/portion_info.cpp index f4a9be0d0f8..419c857a67a 100644 --- a/ydb/core/tx/columnshard/engines/portions/portion_info.cpp +++ b/ydb/core/tx/columnshard/engines/portions/portion_info.cpp @@ -11,13 +11,11 @@ namespace NKikimr::NOlap { ui64 TPortionInfo::GetColumnRawBytes() const { - AFL_VERIFY(Precalculated); - return PrecalculatedColumnRawBytes; + return GetMeta().GetColumnRawBytes(); } ui64 TPortionInfo::GetColumnBlobBytes() const { - AFL_VERIFY(Precalculated); - return PrecalculatedColumnBlobBytes; + return GetMeta().GetColumnBlobBytes(); } TString TPortionInfo::DebugString(const bool withDetails) const { @@ -109,7 +107,6 @@ TConclusionStatus TPortionInfo::DeserializeFromProto(const NKikimrColumnShardDat } Indexes.emplace_back(std::move(parse.DetachResult())); } - Precalculate(); return TConclusionStatus::Success(); } @@ -170,33 +167,6 @@ NSplitter::TEntityGroups TPortionInfo::GetEntityGroupsByStorageId( } } -void TPortionInfo::Precalculate() { - AFL_VERIFY(!Precalculated); - Precalculated = true; - { - PrecalculatedColumnRawBytes = 0; - PrecalculatedColumnBlobBytes = 0; - PrecalculatedRecordsCount = 0; - const auto aggr = [&](const TColumnRecord& r) { - PrecalculatedColumnRawBytes += r.GetMeta().GetRawBytes(); - PrecalculatedColumnBlobBytes += r.BlobRange.GetSize(); - if (r.GetColumnId() == Records.front().GetColumnId()) { - PrecalculatedRecordsCount += r.GetMeta().GetRecordsCount(); - } - }; - TPortionDataAccessor::AggregateIndexChunksData(aggr, Records, nullptr, true); - } - { - PrecalculatedIndexRawBytes = 0; - PrecalculatedIndexBlobBytes = 0; - const auto aggr = [&](const TIndexChunk& r) { - PrecalculatedIndexRawBytes += r.GetRawBytes(); - PrecalculatedIndexBlobBytes += r.GetDataSize(); - }; - TPortionDataAccessor::AggregateIndexChunksData(aggr, Indexes, nullptr, true); - } -} - void TPortionInfo::SaveMetaToDatabase(IDbWrapper& db) const { FullValidation(); db.WritePortion(*this); diff --git a/ydb/core/tx/columnshard/engines/portions/portion_info.h b/ydb/core/tx/columnshard/engines/portions/portion_info.h index f2dc79a8c92..67af582c1a0 100644 --- a/ydb/core/tx/columnshard/engines/portions/portion_info.h +++ b/ydb/core/tx/columnshard/engines/portions/portion_info.h @@ -65,14 +65,8 @@ private: friend class TPortionDataAccessor; friend class TPortionInfoConstructor; - ui64 PrecalculatedColumnRawBytes = 0; - ui64 PrecalculatedColumnBlobBytes = 0; - ui64 PrecalculatedRecordsCount = 0; - ui64 PrecalculatedIndexBlobBytes = 0; - ui64 PrecalculatedIndexRawBytes = 0; - bool Precalculated = false; - - void Precalculate(); + TPortionInfo(const TPortionInfo&) = default; + TPortionInfo& operator=(const TPortionInfo&) = default; TPortionInfo(TPortionMeta&& meta) : Meta(std::move(meta)) { @@ -107,8 +101,15 @@ private: TConclusionStatus DeserializeFromProto(const NKikimrColumnShardDataSharingProto::TPortionInfo& proto); public: + TPortionInfo(TPortionInfo&&) = default; + TPortionInfo& operator=(TPortionInfo&&) = default; + void SaveMetaToDatabase(IDbWrapper& db) const; + TPortionInfo MakeCopy() const { + return *this; + } + const std::vector<TUnifiedBlobId>& GetBlobIds() const { return BlobIds; } @@ -433,18 +434,15 @@ public: ISnapshotSchema::TPtr GetSchema(const TVersionedIndex& index) const; ui32 GetRecordsCount() const { - AFL_VERIFY(Precalculated); - return PrecalculatedRecordsCount; + return GetMeta().GetRecordsCount(); } ui64 GetIndexBlobBytes() const noexcept { - AFL_VERIFY(Precalculated); - return PrecalculatedIndexBlobBytes; + return GetMeta().GetIndexBlobBytes(); } ui64 GetIndexRawBytes() const noexcept { - AFL_VERIFY(Precalculated); - return PrecalculatedIndexRawBytes; + return GetMeta().GetIndexRawBytes(); } ui64 GetColumnRawBytes() const; diff --git a/ydb/core/tx/columnshard/engines/protos/portion_info.proto b/ydb/core/tx/columnshard/engines/protos/portion_info.proto index b62f22790ce..5ecfa7608da 100644 --- a/ydb/core/tx/columnshard/engines/protos/portion_info.proto +++ b/ydb/core/tx/columnshard/engines/protos/portion_info.proto @@ -20,6 +20,11 @@ message TIndexPortionMeta { optional TSnapshot RecordSnapshotMax = 8; optional uint32 DeletionsCount = 10; optional uint64 CompactionLevel = 11 [default = 0]; + optional uint32 RecordsCount = 12; + optional uint64 ColumnRawBytes = 13; + optional uint32 ColumnBlobBytes = 14; + optional uint32 IndexBlobBytes = 15; + optional uint64 IndexRawBytes = 16; } message TIndexColumnMeta { @@ -27,5 +32,5 @@ message TIndexColumnMeta { optional uint32 RawBytes = 2; optional NKikimrSSA.TProgram.TConstant MinValue = 3; optional NKikimrSSA.TProgram.TConstant MaxValue = 4; - optional TIndexPortionMeta PortionMeta = 5; // First PK column could contain portion info + optional TIndexPortionMeta PortionMeta = 5[deprecated = true]; // First PK column could contain portion info } diff --git a/ydb/core/tx/columnshard/engines/storage/granule/granule.cpp b/ydb/core/tx/columnshard/engines/storage/granule/granule.cpp index c44a42b1c94..78a4a09d230 100644 --- a/ydb/core/tx/columnshard/engines/storage/granule/granule.cpp +++ b/ydb/core/tx/columnshard/engines/storage/granule/granule.cpp @@ -9,21 +9,16 @@ namespace NKikimr::NOlap { -void TGranuleMeta::UpsertPortion(const TPortionInfo& info) { - AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("event", "upsert_portion")("portion", info.DebugString())("path_id", GetPathId()); - auto it = Portions.find(info.GetPortionId()); - AFL_VERIFY(info.GetPathId() == GetPathId())("event", "incompatible_granule")("portion", info.DebugString())("path_id", GetPathId()); +void TGranuleMeta::AppendPortion(const TPortionInfo::TPtr& info) { + AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("event", "upsert_portion")("portion", info->DebugString())("path_id", GetPathId()); + auto it = Portions.find(info->GetPortionId()); + AFL_VERIFY(info->GetPathId() == GetPathId())("event", "incompatible_granule")("portion", info->DebugString())("path_id", GetPathId()); - AFL_VERIFY(info.ValidSnapshotInfo())("event", "incorrect_portion_snapshots")("portion", info.DebugString()); + AFL_VERIFY(info->ValidSnapshotInfo())("event", "incorrect_portion_snapshots")("portion", info->DebugString()); - if (it == Portions.end()) { - OnBeforeChangePortion(nullptr); - auto portionNew = std::make_shared<TPortionInfo>(info); - it = Portions.emplace(portionNew->GetPortionId(), portionNew).first; - } else { - OnBeforeChangePortion(it->second); - it->second = std::make_shared<TPortionInfo>(info); - } + AFL_VERIFY(it == Portions.end()); + OnBeforeChangePortion(nullptr); + it = Portions.emplace(info->GetPortionId(), info).first; OnAfterChangePortion(it->second, nullptr); } @@ -138,17 +133,14 @@ TGranuleMeta::TGranuleMeta( ActualizationIndex = std::make_shared<NActualizer::TGranuleActualizationIndex>(PathId, versionedIndex); } -std::shared_ptr<TPortionInfo> TGranuleMeta::UpsertPortionOnLoad(TPortionInfo&& portion) { - if (portion.HasInsertWriteId() && !portion.HasCommitSnapshot()) { - const TInsertWriteId insertWriteId = portion.GetInsertWriteIdVerified(); - auto emplaceInfo = InsertedPortions.emplace(insertWriteId, std::make_shared<TPortionInfo>(std::move(portion))); - AFL_VERIFY(emplaceInfo.second); - return emplaceInfo.first->second; +void TGranuleMeta::UpsertPortionOnLoad(const std::shared_ptr<TPortionInfo>&& portion) { + if (portion->HasInsertWriteId() && !portion->HasCommitSnapshot()) { + const TInsertWriteId insertWriteId = portion->GetInsertWriteIdVerified(); + AFL_VERIFY(InsertedPortions.emplace(insertWriteId, portion).second); + AFL_VERIFY(!Portions.contains(portion->GetPortionId())); } else { - auto portionId = portion.GetPortionId(); - auto emplaceInfo = Portions.emplace(portionId, std::make_shared<TPortionInfo>(std::move(portion))); - AFL_VERIFY(emplaceInfo.second); - return emplaceInfo.first->second; + auto portionId = portion->GetPortionId(); + AFL_VERIFY(Portions.emplace(portionId, portion).second); } } @@ -182,7 +174,7 @@ void TGranuleMeta::ResetOptimizer(const std::shared_ptr<NStorageOptimizer::IOpti void TGranuleMeta::CommitPortionOnComplete(const TInsertWriteId insertWriteId, IColumnEngine& engine) { auto it = InsertedPortions.find(insertWriteId); AFL_VERIFY(it != InsertedPortions.end()); - (static_cast<TColumnEngineForLogs&>(engine)).AppendPortion(*it->second); + (static_cast<TColumnEngineForLogs&>(engine)).AppendPortion(it->second); InsertedPortions.erase(it); } @@ -195,7 +187,7 @@ void TGranuleMeta::CommitImmediateOnExecute( } void TGranuleMeta::CommitImmediateOnComplete(const std::shared_ptr<TPortionInfo> portion, IColumnEngine& engine) { - (static_cast<TColumnEngineForLogs&>(engine)).AppendPortion(*portion); + (static_cast<TColumnEngineForLogs&>(engine)).AppendPortion(portion); } } // namespace NKikimr::NOlap diff --git a/ydb/core/tx/columnshard/engines/storage/granule/granule.h b/ydb/core/tx/columnshard/engines/storage/granule/granule.h index acf6d4b3418..c9686573e2f 100644 --- a/ydb/core/tx/columnshard/engines/storage/granule/granule.h +++ b/ydb/core/tx/columnshard/engines/storage/granule/granule.h @@ -134,12 +134,6 @@ private: void OnAdditiveSummaryChange() const; YDB_READONLY(TMonotonic, LastCompactionInstant, TMonotonic::Zero()); -public: - void RefreshTiering(const std::optional<TTiering>& tiering) { - NActualizer::TAddExternalContext context(HasAppData() ? AppDataVerified().TimeProvider->Now() : TInstant::Now(), Portions); - ActualizationIndex->RefreshTiering(tiering, context); - } - TConclusion<std::shared_ptr<TPortionInfo>> GetInnerPortion(const TPortionInfo::TConstPtr& portion) const { if (!portion) { return TConclusionStatus::Fail("empty input portion pointer"); @@ -155,12 +149,18 @@ public: return it->second; } +public: + void RefreshTiering(const std::optional<TTiering>& tiering) { + NActualizer::TAddExternalContext context(HasAppData() ? AppDataVerified().TimeProvider->Now() : TInstant::Now(), Portions); + ActualizationIndex->RefreshTiering(tiering, context); + } + template <class TModifier> void ModifyPortionOnExecute( IDbWrapper& wrapper, const TPortionInfo::TConstPtr& portion, const TModifier& modifier, const ui32 firstPKColumnId) const { const auto innerPortion = GetInnerPortion(portion).DetachResult(); AFL_VERIFY((ui64)innerPortion.get() == (ui64)portion.get()); - auto copy = *innerPortion; + auto copy = innerPortion->MakeCopy(); modifier(copy); TPortionDataAccessor(std::make_shared<TPortionInfo>(std::move(copy))).SaveToDatabase(wrapper, firstPKColumnId, false); } @@ -298,7 +298,7 @@ public: void OnCompactionFailed(const TString& reason); void OnCompactionFinished(); - void UpsertPortion(const TPortionInfo& info); + void AppendPortion(const TPortionInfo::TPtr& info); TString DebugString() const { return TStringBuilder() << "(granule:" << GetPathId() << ";" @@ -308,7 +308,7 @@ public: << ")"; } - std::shared_ptr<TPortionInfo> UpsertPortionOnLoad(TPortionInfo&& portion); + void UpsertPortionOnLoad(const std::shared_ptr<TPortionInfo>&& portion); const THashMap<ui64, std::shared_ptr<TPortionInfo>>& GetPortions() const { return Portions; diff --git a/ydb/core/tx/columnshard/engines/ut/ut_insert_table.cpp b/ydb/core/tx/columnshard/engines/ut/ut_insert_table.cpp index cac07cdd8ec..a3bb6bfd645 100644 --- a/ydb/core/tx/columnshard/engines/ut/ut_insert_table.cpp +++ b/ydb/core/tx/columnshard/engines/ut/ut_insert_table.cpp @@ -50,7 +50,7 @@ public: } void EraseColumn(const TPortionInfo&, const TColumnRecord&) override { } - bool LoadColumns(const std::function<void(NOlap::TPortionInfoConstructor&&, const TColumnChunkLoadContext&)>&) override { + bool LoadColumns(const std::function<void(const TColumnChunkLoadContext&)>&) override { return true; } diff --git a/ydb/core/tx/columnshard/engines/ut/ut_logs_engine.cpp b/ydb/core/tx/columnshard/engines/ut/ut_logs_engine.cpp index 1cabcd07dd1..817aef1ea22 100644 --- a/ydb/core/tx/columnshard/engines/ut/ut_logs_engine.cpp +++ b/ydb/core/tx/columnshard/engines/ut/ut_logs_engine.cpp @@ -88,13 +88,21 @@ public: return true; } - virtual void WritePortion(const NOlap::TPortionInfo& /*portion*/) override { - + virtual void WritePortion(const NOlap::TPortionInfo& portion) override { + auto it = Portions.find(portion.GetPortionId()); + if (it == Portions.end()) { + Portions.emplace(portion.GetPortionId(), portion.MakeCopy()); + } else { + it->second = portion.MakeCopy(); + } } - virtual void ErasePortion(const NOlap::TPortionInfo& /*portion*/) override { - + virtual void ErasePortion(const NOlap::TPortionInfo& portion) override { + AFL_VERIFY(Portions.erase(portion.GetPortionId())); } - virtual bool LoadPortions(const std::function<void(NOlap::TPortionInfoConstructor&&, const NKikimrTxColumnShard::TIndexPortionMeta&)>& /*callback*/) override { + virtual bool LoadPortions(const std::function<void(NOlap::TPortionInfoConstructor&&, const NKikimrTxColumnShard::TIndexPortionMeta&)>& callback) override { + for (auto&& i : Portions) { + callback(NOlap::TPortionInfoConstructor(i.second, false, true), i.second.GetMeta().SerializeToProto()); + } return true; } @@ -105,7 +113,8 @@ public: } auto& data = Indices[0].Columns[portion.GetPathId()]; - NOlap::TColumnChunkLoadContext loadContext(row.GetAddress(), portion.RestoreBlobRange(row.BlobRange), rowProto); + NOlap::TColumnChunkLoadContext loadContext( + portion.GetPathId(), portion.GetPortionId(), row.GetAddress(), portion.RestoreBlobRange(row.BlobRange), rowProto); auto itInsertInfo = LoadContexts[portion.GetAddress()].emplace(row.GetAddress(), loadContext); if (!itInsertInfo.second) { itInsertInfo.first->second = loadContext; @@ -153,7 +162,7 @@ public: portionLocal.MutableRecords().swap(filtered); } - bool LoadColumns(const std::function<void(NOlap::TPortionInfoConstructor&&, const TColumnChunkLoadContext&)>& callback) override { + bool LoadColumns(const std::function<void(const TColumnChunkLoadContext&)>& callback) override { auto& columns = Indices[0].Columns; for (auto& [pathId, portions] : columns) { for (auto& [portionId, portionLocal] : portions) { @@ -163,7 +172,7 @@ public: auto itContextLoader = LoadContexts[copy.GetAddress()].find(rec.GetAddress()); Y_ABORT_UNLESS(itContextLoader != LoadContexts[copy.GetAddress()].end()); auto address = copy.GetAddress(); - callback(std::move(copy), itContextLoader->second); + callback(itContextLoader->second); LoadContexts[address].erase(itContextLoader); } } @@ -192,6 +201,7 @@ private: THashMap<TInsertWriteId, TInsertedData> Inserted; THashMap<ui64, TSet<TCommittedData>> Committed; THashMap<TInsertWriteId, TInsertedData> Aborted; + THashMap<ui64, NOlap::TPortionInfo> Portions; THashMap<ui32, TIndex> Indices; }; diff --git a/ydb/core/tx/columnshard/normalizer/abstract/abstract.h b/ydb/core/tx/columnshard/normalizer/abstract/abstract.h index e75099ecd9b..4cdd52799ee 100644 --- a/ydb/core/tx/columnshard/normalizer/abstract/abstract.h +++ b/ydb/core/tx/columnshard/normalizer/abstract/abstract.h @@ -52,13 +52,13 @@ public: enum class ENormalizerSequentialId: ui32 { Granules = 1, Chunks, - PortionsCleaner, TablesCleaner, - PortionsMetadata, CleanGranuleId, EmptyPortionsCleaner, CleanInsertionDedup, GCCountersNormalizer, + RestorePortionFromChunks, + SyncPortionFromChunks, MAX }; diff --git a/ydb/core/tx/columnshard/normalizer/portion/broken_blobs.cpp b/ydb/core/tx/columnshard/normalizer/portion/broken_blobs.cpp index c7a2cdf4d30..9772beef418 100644 --- a/ydb/core/tx/columnshard/normalizer/portion/broken_blobs.cpp +++ b/ydb/core/tx/columnshard/normalizer/portion/broken_blobs.cpp @@ -29,7 +29,7 @@ public: AFL_VERIFY(!!schema)("portion_id", portionInfo.GetPortionInfo().GetPortionId()); AFL_CRIT(NKikimrServices::TX_COLUMNSHARD)("event", "portion_removed_as_broken")( "portion_id", portionInfo.GetPortionInfo().GetAddress().DebugString()); - auto copy = portionInfo.GetPortionInfo(); + auto copy = portionInfo.GetPortionInfo().MakeCopy(); copy.SetRemoveSnapshot(TSnapshot(1, 1)); copy.SaveMetaToDatabase(db); } diff --git a/ydb/core/tx/columnshard/normalizer/portion/chunks_actualization.cpp b/ydb/core/tx/columnshard/normalizer/portion/chunks_actualization.cpp new file mode 100644 index 00000000000..5f14318750d --- /dev/null +++ b/ydb/core/tx/columnshard/normalizer/portion/chunks_actualization.cpp @@ -0,0 +1,184 @@ +#include "chunks_actualization.h" +#include "normalizer.h" + +#include <ydb/core/formats/arrow/size_calcer.h> +#include <ydb/core/tx/columnshard/engines/portions/data_accessor.h> +#include <ydb/core/tx/columnshard/engines/portions/portion_info.h> +#include <ydb/core/tx/columnshard/tables_manager.h> + +namespace NKikimr::NOlap::NSyncChunksWithPortions1 { + +class TPatchItem { +private: + TPortionLoadContext PortionInfo; + YDB_READONLY_DEF(std::vector<TColumnChunkLoadContext>, Records); + YDB_READONLY_DEF(std::vector<TIndexChunkLoadContext>, Indexes); + +public: + const TPortionLoadContext& GetPortionInfo() const { + return PortionInfo; + } + + TPatchItem(TPortionLoadContext&& portion, std::vector<TColumnChunkLoadContext>&& records, std::vector<TIndexChunkLoadContext>&& indexes) + : PortionInfo(std::move(portion)) + , Records(std::move(records)) + , Indexes(std::move(indexes)) + { + + } +}; + +class TChanges: public INormalizerChanges { +private: + std::vector<TPatchItem> Patches; + +public: + TChanges(std::vector<TPatchItem>&& patches) + : Patches(std::move(patches)) { + } + virtual bool ApplyOnExecute(NTabletFlatExecutor::TTransactionContext& txc, const TNormalizationController&) const override { + using namespace NColumnShard; + NIceDb::TNiceDb db(txc.DB); + for (auto&& i : Patches) { + auto meta = i.GetPortionInfo().GetMetaProto(); + ui32 recordsCount = 0; + ui64 columnRawBytes = 0; + ui64 indexRawBytes = 0; + ui32 columnBlobBytes = 0; + ui32 indexBlobBytes = 0; + + for (auto&& c : i.GetRecords()) { + columnRawBytes += c.GetMetaProto().GetRawBytes(); + columnBlobBytes += c.GetBlobRange().GetSize(); + if (i.GetRecords().front().GetAddress().GetColumnId() == c.GetAddress().GetColumnId()) { + recordsCount += c.GetMetaProto().GetNumRows(); + } + } + for (auto&& c : i.GetIndexes()) { + columnRawBytes += c.GetRawBytes(); + columnBlobBytes += c.GetDataSize(); + } + meta.SetRecordsCount(recordsCount); + meta.SetColumnRawBytes(columnRawBytes); + meta.SetColumnBlobBytes(columnBlobBytes); + meta.SetIndexRawBytes(indexRawBytes); + meta.SetIndexBlobBytes(indexBlobBytes); + + db.Table<Schema::IndexPortions>() + .Key(i.GetPortionInfo().GetPathId(), i.GetPortionInfo().GetPortionId()) + .Update(NIceDb::TUpdate<Schema::IndexPortions::Metadata>(meta.SerializeAsString())); + } + + return true; + } + + virtual ui64 GetSize() const override { + return Patches.size(); + } + +}; + +TConclusion<std::vector<INormalizerTask::TPtr>> TNormalizer::DoInit( + const TNormalizationController& /*controller*/, NTabletFlatExecutor::TTransactionContext& txc) { + using namespace NColumnShard; + NIceDb::TNiceDb db(txc.DB); + + bool ready = true; + ready = ready & Schema::Precharge<Schema::IndexColumns>(db, txc.DB.GetScheme()); + ready = ready & Schema::Precharge<Schema::IndexPortions>(db, txc.DB.GetScheme()); + ready = ready & Schema::Precharge<Schema::IndexIndexes>(db, txc.DB.GetScheme()); + if (!ready) { + return TConclusionStatus::Fail("Not ready"); + } + + THashMap<ui64, TPortionLoadContext> dbPortions; + THashMap<ui64, std::vector<TColumnChunkLoadContext>> recordsByPortion; + THashMap<ui64, std::vector<TIndexChunkLoadContext>> indexesByPortion; + + { + auto rowset = db.Table<Schema::IndexPortions>().Select(); + if (!rowset.IsReady()) { + return TConclusionStatus::Fail("Not ready"); + } + + while (!rowset.EndOfSet()) { + TPortionLoadContext portion(rowset); + AFL_VERIFY(dbPortions.emplace(portion.GetPortionId(), portion).second); + + if (!rowset.Next()) { + return TConclusionStatus::Fail("Not ready"); + } + } + } + + { + auto rowset = db.Table<Schema::IndexColumns>().Select(); + if (!rowset.IsReady()) { + return TConclusionStatus::Fail("Not ready"); + } + + while (!rowset.EndOfSet()) { + TColumnChunkLoadContext chunk(rowset, &DsGroupSelector); + const ui64 portionId = chunk.GetPortionId(); + AFL_VERIFY(dbPortions.contains(portionId)); + recordsByPortion[portionId].emplace_back(std::move(chunk)); + + if (!rowset.Next()) { + return TConclusionStatus::Fail("Not ready"); + } + } + } + + { + auto rowset = db.Table<Schema::IndexIndexes>().Select(); + if (!rowset.IsReady()) { + return TConclusionStatus::Fail("Not ready"); + } + + while (!rowset.EndOfSet()) { + TIndexChunkLoadContext chunk(rowset, &DsGroupSelector); + const ui64 portionId = chunk.GetPortionId(); + AFL_VERIFY(dbPortions.contains(portionId)); + indexesByPortion[portionId].emplace_back(std::move(chunk)); + + if (!rowset.Next()) { + return TConclusionStatus::Fail("Not ready"); + } + } + } + AFL_VERIFY(dbPortions.size() == recordsByPortion.size())("portions", dbPortions.size())("records", recordsByPortion.size()); + + for (auto&& i : indexesByPortion) { + AFL_VERIFY(dbPortions.contains(i.first)); + } + + std::vector<INormalizerTask::TPtr> tasks; + if (dbPortions.empty()) { + return tasks; + } + + std::vector<TPatchItem> package; + + for (auto&& [_, portion] : dbPortions) { + if (portion.GetMetaProto().GetRecordsCount()) { + continue; + } + auto itRecords = recordsByPortion.find(portion.GetPortionId()); + AFL_VERIFY(itRecords != recordsByPortion.end()); + auto itIndexes = indexesByPortion.find(portion.GetPortionId()); + auto indexes = (itIndexes == indexesByPortion.end()) ? Default<std::vector<TIndexChunkLoadContext>>() : itIndexes->second; + package.emplace_back(std::move(portion), std::move(itRecords->second), std::move(indexes)); + if (package.size() == 100) { + std::vector<TPatchItem> local; + local.swap(package); + tasks.emplace_back(std::make_shared<TTrivialNormalizerTask>(std::make_shared<TChanges>(std::move(local)))); + } + } + + if (package.size() > 0) { + tasks.emplace_back(std::make_shared<TTrivialNormalizerTask>(std::make_shared<TChanges>(std::move(package)))); + } + return tasks; +} + +} // namespace NKikimr::NOlap::NChunksActualization diff --git a/ydb/core/tx/columnshard/normalizer/portion/chunks_actualization.h b/ydb/core/tx/columnshard/normalizer/portion/chunks_actualization.h new file mode 100644 index 00000000000..71b1b1fd44f --- /dev/null +++ b/ydb/core/tx/columnshard/normalizer/portion/chunks_actualization.h @@ -0,0 +1,43 @@ +#pragma once + +#include <ydb/core/tx/columnshard/columnshard_schema.h> +#include <ydb/core/tx/columnshard/defs.h> +#include <ydb/core/tx/columnshard/normalizer/abstract/abstract.h> + +namespace NKikimr::NColumnShard { +class TTablesManager; +} + +namespace NKikimr::NOlap::NSyncChunksWithPortions1 { + +class TNormalizer: public TNormalizationController::INormalizerComponent { +public: + static TString GetClassNameStatic() { + return ::ToString(ENormalizerSequentialId::SyncPortionFromChunks); + } + + virtual std::optional<ENormalizerSequentialId> DoGetEnumSequentialId() const override { + return ENormalizerSequentialId::SyncPortionFromChunks; + } + + virtual TString GetClassName() const override { + return GetClassNameStatic(); + } + + class TNormalizerResult; + + static inline INormalizerComponent::TFactory::TRegistrator<TNormalizer> Registrator = + INormalizerComponent::TFactory::TRegistrator<TNormalizer>(GetClassNameStatic()); + +public: + TNormalizer(const TNormalizationController::TInitContext& info) + : DsGroupSelector(info.GetStorageInfo()) { + } + + virtual TConclusion<std::vector<INormalizerTask::TPtr>> DoInit( + const TNormalizationController& controller, NTabletFlatExecutor::TTransactionContext& txc) override; + +private: + NColumnShard::TBlobGroupSelector DsGroupSelector; +}; +} // namespace NKikimr::NOlap::NChunksActualization diff --git a/ydb/core/tx/columnshard/normalizer/portion/clean.h b/ydb/core/tx/columnshard/normalizer/portion/clean.h index a9b4b95bd06..6b01d65b377 100644 --- a/ydb/core/tx/columnshard/normalizer/portion/clean.h +++ b/ydb/core/tx/columnshard/normalizer/portion/clean.h @@ -14,7 +14,7 @@ namespace NKikimr::NOlap { class TCleanPortionsNormalizer: public TPortionsNormalizerBase { public: static TString GetClassNameStatic() { - return ::ToString(ENormalizerSequentialId::PortionsCleaner); + return "PortionsCleaner"; } private: @@ -27,7 +27,7 @@ public: public: virtual std::optional<ENormalizerSequentialId> DoGetEnumSequentialId() const override { - return ENormalizerSequentialId::PortionsCleaner; + return std::nullopt; } virtual TString GetClassName() const override { diff --git a/ydb/core/tx/columnshard/normalizer/portion/normalizer.cpp b/ydb/core/tx/columnshard/normalizer/portion/normalizer.cpp index 5bda885409b..d581a6579d8 100644 --- a/ydb/core/tx/columnshard/normalizer/portion/normalizer.cpp +++ b/ydb/core/tx/columnshard/normalizer/portion/normalizer.cpp @@ -129,7 +129,7 @@ TConclusionStatus TPortionsNormalizerBase::InitColumns( } else { it->second.Merge(std::move(portion)); } - it->second.LoadRecord(currentSchema->GetIndexInfo(), loadContext); + it->second.LoadRecord(loadContext); }; while (!rowset.EndOfSet()) { diff --git a/ydb/core/tx/columnshard/normalizer/portion/portion.h b/ydb/core/tx/columnshard/normalizer/portion/portion.h index 134b2d94ba6..812e51412f3 100644 --- a/ydb/core/tx/columnshard/normalizer/portion/portion.h +++ b/ydb/core/tx/columnshard/normalizer/portion/portion.h @@ -14,7 +14,7 @@ namespace NKikimr::NOlap { class TPortionsNormalizer : public TPortionsNormalizerBase { public: static TString GetClassNameStatic() { - return ::ToString(ENormalizerSequentialId::PortionsMetadata); + return "PortionsMetadata"; } private: @@ -26,7 +26,7 @@ public: public: virtual std::optional<ENormalizerSequentialId> DoGetEnumSequentialId() const override { - return ENormalizerSequentialId::PortionsMetadata; + return std::nullopt; } virtual TString GetClassName() const override { diff --git a/ydb/core/tx/columnshard/normalizer/portion/restore_portion_from_chunks.cpp b/ydb/core/tx/columnshard/normalizer/portion/restore_portion_from_chunks.cpp new file mode 100644 index 00000000000..f4fb377f3b7 --- /dev/null +++ b/ydb/core/tx/columnshard/normalizer/portion/restore_portion_from_chunks.cpp @@ -0,0 +1,147 @@ +#include "normalizer.h" +#include "restore_portion_from_chunks.h" + +#include <ydb/core/formats/arrow/size_calcer.h> +#include <ydb/core/tx/columnshard/engines/portions/data_accessor.h> +#include <ydb/core/tx/columnshard/engines/portions/portion_info.h> +#include <ydb/core/tx/columnshard/tables_manager.h> + +namespace NKikimr::NOlap::NRestorePortionsFromChunks { + +class TPatchItem { +private: + YDB_READONLY(ui32, SchemaVersion, 0); + TColumnChunkLoadContext ChunkInfo; + +public: + const TColumnChunkLoadContext& GetChunkInfo() const { + return ChunkInfo; + } + + TPatchItem(const ui32 schemaVersion, TColumnChunkLoadContext&& chunkInfo) + : SchemaVersion(schemaVersion) + , ChunkInfo(std::move(chunkInfo)) { + } +}; + +class TChanges: public INormalizerChanges { +private: + std::vector<TPatchItem> Patches; + +public: + TChanges(std::vector<TPatchItem>&& patches) + : Patches(std::move(patches)) { + } + virtual bool ApplyOnExecute(NTabletFlatExecutor::TTransactionContext& txc, const TNormalizationController&) const override { + using namespace NColumnShard; + NIceDb::TNiceDb db(txc.DB); + for (auto&& i : Patches) { + AFL_VERIFY(i.GetChunkInfo().GetMetaProto().HasPortionMeta()); + auto metaProtoString = i.GetChunkInfo().GetMetaProto().GetPortionMeta().SerializeAsString(); + using IndexPortions = NColumnShard::Schema::IndexPortions; + const auto removeSnapshot = i.GetChunkInfo().GetRemoveSnapshot(); + const auto minSnapshotDeprecated = i.GetChunkInfo().GetMinSnapshotDeprecated(); + db.Table<IndexPortions>() + .Key(i.GetChunkInfo().GetPathId(), i.GetChunkInfo().GetPortionId()) + .Update(NIceDb::TUpdate<IndexPortions::SchemaVersion>(i.GetSchemaVersion()), NIceDb::TUpdate<IndexPortions::ShardingVersion>(0), + NIceDb::TUpdate<IndexPortions::CommitPlanStep>(0), NIceDb::TUpdate<IndexPortions::CommitTxId>(0), + NIceDb::TUpdate<IndexPortions::InsertWriteId>(0), NIceDb::TUpdate<IndexPortions::XPlanStep>(removeSnapshot.GetPlanStep()), + NIceDb::TUpdate<IndexPortions::XTxId>(removeSnapshot.GetTxId()), + NIceDb::TUpdate<IndexPortions::MinSnapshotPlanStep>(minSnapshotDeprecated.GetPlanStep()), + NIceDb::TUpdate<IndexPortions::MinSnapshotTxId>(minSnapshotDeprecated.GetTxId()), + NIceDb::TUpdate<IndexPortions::Metadata>(metaProtoString)); + } + + return true; + } + + virtual ui64 GetSize() const override { + return Patches.size(); + } +}; + +TConclusion<std::vector<INormalizerTask::TPtr>> TNormalizer::DoInit( + const TNormalizationController& controller, NTabletFlatExecutor::TTransactionContext& txc) { + using namespace NColumnShard; + NIceDb::TNiceDb db(txc.DB); + + bool ready = true; + ready = ready & Schema::Precharge<Schema::IndexColumns>(db, txc.DB.GetScheme()); + ready = ready & Schema::Precharge<Schema::IndexPortions>(db, txc.DB.GetScheme()); + if (!ready) { + return TConclusionStatus::Fail("Not ready"); + } + + TTablesManager tablesManager(controller.GetStoragesManager(), 0); + if (!tablesManager.InitFromDB(db)) { + ACFL_TRACE("normalizer", "TChunksNormalizer")("error", "can't initialize tables manager"); + return TConclusionStatus::Fail("Can't load index"); + } + + THashMap<ui64, TPortionLoadContext> dbPortions; + + { + auto rowset = db.Table<Schema::IndexPortions>().Select(); + if (!rowset.IsReady()) { + return TConclusionStatus::Fail("Not ready"); + } + + while (!rowset.EndOfSet()) { + TPortionLoadContext portion(rowset); + AFL_VERIFY(dbPortions.emplace(portion.GetPortionId(), portion).second); + + if (!rowset.Next()) { + return TConclusionStatus::Fail("Not ready"); + } + } + } + + THashMap<ui64, TColumnChunkLoadContext> portionsToWrite; + { + auto rowset = db.Table<Schema::IndexColumns>().Select(); + if (!rowset.IsReady()) { + return TConclusionStatus::Fail("Not ready"); + } + + THashSet<ui64> portionsToRestore; + while (!rowset.EndOfSet()) { + TColumnChunkLoadContext chunk(rowset, &DsGroupSelector); + if (!dbPortions.contains(chunk.GetPortionId())) { + portionsToRestore.emplace(chunk.GetPortionId()); + if (chunk.GetMetaProto().HasPortionMeta()) { + AFL_VERIFY(portionsToWrite.emplace(chunk.GetPortionId(), chunk).second); + } + } + + if (!rowset.Next()) { + return TConclusionStatus::Fail("Not ready"); + } + } + AFL_VERIFY(portionsToRestore.size() == portionsToWrite.size()); + } + + std::vector<INormalizerTask::TPtr> tasks; + if (portionsToWrite.empty()) { + return tasks; + } + + std::vector<TPatchItem> package; + + for (auto&& [_, chunkWithPortionData] : portionsToWrite) { + package.emplace_back( + tablesManager.GetPrimaryIndexSafe().GetVersionedIndex().GetSchema(chunkWithPortionData.GetMinSnapshotDeprecated())->GetVersion(), + std::move(chunkWithPortionData)); + if (package.size() == 100) { + std::vector<TPatchItem> local; + local.swap(package); + tasks.emplace_back(std::make_shared<TTrivialNormalizerTask>(std::make_shared<TChanges>(std::move(local)))); + } + } + + if (package.size() > 0) { + tasks.emplace_back(std::make_shared<TTrivialNormalizerTask>(std::make_shared<TChanges>(std::move(package)))); + } + return tasks; +} + +} // namespace NKikimr::NOlap::NRestorePortionsFromChunks diff --git a/ydb/core/tx/columnshard/normalizer/portion/restore_portion_from_chunks.h b/ydb/core/tx/columnshard/normalizer/portion/restore_portion_from_chunks.h new file mode 100644 index 00000000000..f8120c16ef3 --- /dev/null +++ b/ydb/core/tx/columnshard/normalizer/portion/restore_portion_from_chunks.h @@ -0,0 +1,43 @@ +#pragma once + +#include <ydb/core/tx/columnshard/columnshard_schema.h> +#include <ydb/core/tx/columnshard/defs.h> +#include <ydb/core/tx/columnshard/normalizer/abstract/abstract.h> + +namespace NKikimr::NColumnShard { +class TTablesManager; +} + +namespace NKikimr::NOlap::NRestorePortionsFromChunks { + +class TNormalizer: public TNormalizationController::INormalizerComponent { +public: + static TString GetClassNameStatic() { + return ::ToString(ENormalizerSequentialId::RestorePortionFromChunks); + } + + virtual std::optional<ENormalizerSequentialId> DoGetEnumSequentialId() const override { + return ENormalizerSequentialId::RestorePortionFromChunks; + } + + virtual TString GetClassName() const override { + return GetClassNameStatic(); + } + + class TNormalizerResult; + + static inline INormalizerComponent::TFactory::TRegistrator<TNormalizer> Registrator = + INormalizerComponent::TFactory::TRegistrator<TNormalizer>(GetClassNameStatic()); + +public: + TNormalizer(const TNormalizationController::TInitContext& info) + : DsGroupSelector(info.GetStorageInfo()) { + } + + virtual TConclusion<std::vector<INormalizerTask::TPtr>> DoInit( + const TNormalizationController& controller, NTabletFlatExecutor::TTransactionContext& txc) override; + +private: + NColumnShard::TBlobGroupSelector DsGroupSelector; +}; +} // namespace NKikimr::NOlap::NChunksActualization diff --git a/ydb/core/tx/columnshard/normalizer/portion/ya.make b/ydb/core/tx/columnshard/normalizer/portion/ya.make index e53cf5497b8..386cbe711bc 100644 --- a/ydb/core/tx/columnshard/normalizer/portion/ya.make +++ b/ydb/core/tx/columnshard/normalizer/portion/ya.make @@ -8,6 +8,8 @@ SRCS( GLOBAL clean_empty.cpp GLOBAL broken_blobs.cpp GLOBAL special_cleaner.cpp + GLOBAL chunks_actualization.cpp + GLOBAL restore_portion_from_chunks.cpp ) PEERDIR( diff --git a/ydb/core/tx/columnshard/ut_rw/ut_normalizer.cpp b/ydb/core/tx/columnshard/ut_rw/ut_normalizer.cpp index 7fe1cb04405..bfd5bd88909 100644 --- a/ydb/core/tx/columnshard/ut_rw/ut_normalizer.cpp +++ b/ydb/core/tx/columnshard/ut_rw/ut_normalizer.cpp @@ -37,66 +37,12 @@ public: virtual ~TNormalizerChecker() { } - virtual ui64 RecordsCountAfterReboot(const ui64 initialRecodsCount) const { - return initialRecodsCount; - } -}; - -class TPathIdCleaner: public NYDBTest::ILocalDBModifier { -public: - virtual void Apply(NTabletFlatExecutor::TTransactionContext& txc) const override { - using namespace NColumnShard; - NIceDb::TNiceDb db(txc.DB); - - THashMap<ui64, TPortionRecord> portion2Key; - std::optional<ui64> pathId; - { - auto rowset = db.Table<Schema::IndexColumns>().Select(); - UNIT_ASSERT(rowset.IsReady()); - - while (!rowset.EndOfSet()) { - TPortionRecord key; - key.Index = rowset.GetValue<Schema::IndexColumns::Index>(); - key.Granule = rowset.GetValue<Schema::IndexColumns::Granule>(); - key.ColumnIdx = rowset.GetValue<Schema::IndexColumns::ColumnIdx>(); - key.PlanStep = rowset.GetValue<Schema::IndexColumns::PlanStep>(); - key.TxId = rowset.GetValue<Schema::IndexColumns::TxId>(); - key.Portion = rowset.GetValue<Schema::IndexColumns::Portion>(); - key.Chunk = rowset.GetValue<Schema::IndexColumns::Chunk>(); - - key.XPlanStep = rowset.GetValue<Schema::IndexColumns::XPlanStep>(); - key.XTxId = rowset.GetValue<Schema::IndexColumns::XTxId>(); - key.Blob = rowset.GetValue<Schema::IndexColumns::Blob>(); - key.Metadata = rowset.GetValue<Schema::IndexColumns::Metadata>(); - key.Offset = rowset.GetValue<Schema::IndexColumns::Offset>(); - key.Size = rowset.GetValue<Schema::IndexColumns::Size>(); - - pathId = rowset.GetValue<Schema::IndexColumns::PathId>(); - - portion2Key[key.Portion] = key; - - UNIT_ASSERT(rowset.Next()); - } - } - - UNIT_ASSERT(pathId.has_value()); - - for (auto&& [portionId, key] : portion2Key) { - db.Table<Schema::IndexColumns>().Key(key.Index, key.Granule, key.ColumnIdx, key.PlanStep, key.TxId, key.Portion, key.Chunk).Delete(); + virtual void CorrectConfigurationOnStart(NKikimrConfig::TColumnShardConfig& /*columnShardConfig*/) const { - db.Table<Schema::IndexColumns>() - .Key(key.Index, 1, key.ColumnIdx, key.PlanStep, key.TxId, key.Portion, key.Chunk) - .Update(NIceDb::TUpdate<Schema::IndexColumns::XPlanStep>(key.XPlanStep), NIceDb::TUpdate<Schema::IndexColumns::XTxId>(key.XTxId), - NIceDb::TUpdate<Schema::IndexColumns::Blob>(key.Blob), NIceDb::TUpdate<Schema::IndexColumns::Metadata>(key.Metadata), - NIceDb::TUpdate<Schema::IndexColumns::Offset>(key.Offset), NIceDb::TUpdate<Schema::IndexColumns::Size>(key.Size), - - NIceDb::TNull<Schema::IndexColumns::PathId>()); - } + } - db.Table<Schema::IndexGranules>() - .Key(0, *pathId, "1") - .Update(NIceDb::TUpdate<Schema::IndexGranules::Granule>(1), NIceDb::TUpdate<Schema::IndexGranules::PlanStep>(1), - NIceDb::TUpdate<Schema::IndexGranules::TxId>(1), NIceDb::TUpdate<Schema::IndexGranules::Metadata>("")); + virtual ui64 RecordsCountAfterReboot(const ui64 initialRecodsCount) const { + return initialRecodsCount; } }; @@ -308,9 +254,7 @@ Y_UNIT_TEST_SUITE(Normalizers) { TTestBasicRuntime runtime; TTester::Setup(runtime); - auto* repair = runtime.GetAppData().ColumnShardConfig.MutableRepairs()->Add(); - repair->SetClassName("SchemaVersionCleaner"); - repair->SetDescription("Removing unused schema versions"); + checker.CorrectConfigurationOnStart(runtime.GetAppData().ColumnShardConfig); const ui64 tableId = 1; const std::vector<NArrow::NTest::TTestColumn> schema = { NArrow::NTest::TTestColumn("key1", TTypeInfo(NTypeIds::Uint64)), @@ -344,10 +288,6 @@ Y_UNIT_TEST_SUITE(Normalizers) { } } - Y_UNIT_TEST(PathIdNormalizer) { - TestNormalizerImpl<TPathIdCleaner>(); - } - Y_UNIT_TEST(ColumnChunkNormalizer) { TestNormalizerImpl<TColumnChunksCleaner>(); } @@ -357,7 +297,15 @@ Y_UNIT_TEST_SUITE(Normalizers) { } Y_UNIT_TEST(SchemaVersionsNormalizer) { - TestNormalizerImpl<TSchemaVersionsCleaner>(); + class TLocalNormalizerChecker: public TNormalizerChecker { + public: + virtual void CorrectConfigurationOnStart(NKikimrConfig::TColumnShardConfig& columnShardConfig) const override { + auto* repair = columnShardConfig.MutableRepairs()->Add(); + repair->SetClassName("SchemaVersionCleaner"); + repair->SetDescription("Removing unused schema versions"); + } + }; + TestNormalizerImpl<TSchemaVersionsCleaner>(TLocalNormalizerChecker()); } Y_UNIT_TEST(CleanEmptyPortionsNormalizer) { @@ -371,6 +319,11 @@ Y_UNIT_TEST_SUITE(Normalizers) { ui64 RecordsCountAfterReboot(const ui64) const override { return 0; } + virtual void CorrectConfigurationOnStart(NKikimrConfig::TColumnShardConfig& columnShardConfig) const override { + auto* repair = columnShardConfig.MutableRepairs()->Add(); + repair->SetClassName("PortionsCleaner"); + repair->SetDescription("Removing dirty portions withno tables"); + } }; TLocalNormalizerChecker checker; TestNormalizerImpl<TTablesCleaner>(checker); |