diff options
author | ivanmorozov333 <ivanmorozov@ydb.tech> | 2025-05-04 16:53:49 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2025-05-04 16:53:49 +0300 |
commit | dd4139765d39e76eb3b9af25f52a9f929f125d0d (patch) | |
tree | 4effa307a5ebb107dcf4a104779c81e343b84669 | |
parent | 6b6327610ba481e9335a036c6d44815e4cd6ba97 (diff) | |
download | ydb-dd4139765d39e76eb3b9af25f52a9f929f125d0d.tar.gz |
dont fill deprecated field (#17976)
Co-authored-by: ivanmorozov333 <imorozov333@ya.ru>
19 files changed, 21 insertions, 443 deletions
diff --git a/ydb/core/tx/columnshard/data_sharing/protos/data.proto b/ydb/core/tx/columnshard/data_sharing/protos/data.proto index 6ead2d5241e..257ca28ce84 100644 --- a/ydb/core/tx/columnshard/data_sharing/protos/data.proto +++ b/ydb/core/tx/columnshard/data_sharing/protos/data.proto @@ -29,7 +29,6 @@ message TIndexChunk { message TPortionInfo { optional uint64 PathId = 1; optional uint64 PortionId = 2; - optional NKikimrColumnShardProto.TSnapshot MinSnapshotDeprecated = 3; optional NKikimrColumnShardProto.TSnapshot RemoveSnapshot = 4; optional NKikimrTxColumnShard.TIndexPortionMeta Meta = 5; repeated TColumnRecord Records = 6; diff --git a/ydb/core/tx/columnshard/engines/db_wrapper.cpp b/ydb/core/tx/columnshard/engines/db_wrapper.cpp index 06f8e9898e5..07abcff14ad 100644 --- a/ydb/core/tx/columnshard/engines/db_wrapper.cpp +++ b/ydb/core/tx/columnshard/engines/db_wrapper.cpp @@ -162,10 +162,8 @@ bool TDbWrapper::LoadPortions(const std::optional<TInternalPathId> pathId, portion->SetShardingVersion(rowset.template GetValue<IndexPortions::ShardingVersion>()); } portion->SetRemoveSnapshot(rowset.template GetValue<IndexPortions::XPlanStep>(), rowset.template GetValue<IndexPortions::XTxId>()); - if (rowset.template GetValue<IndexPortions::MinSnapshotPlanStep>()) { - portion->SetMinSnapshotDeprecated(TSnapshot( - rowset.template GetValue<IndexPortions::MinSnapshotPlanStep>(), rowset.template GetValue<IndexPortions::MinSnapshotTxId>())); - } + portion->SetMinSnapshotDeprecated(TSnapshot( + rowset.template GetValue<IndexPortions::MinSnapshotPlanStep>(), rowset.template GetValue<IndexPortions::MinSnapshotTxId>())); NKikimrTxColumnShard::TIndexPortionMeta metaProto; const TString metadata = rowset.template GetValue<NColumnShard::Schema::IndexPortions::Metadata>(); diff --git a/ydb/core/tx/columnshard/engines/portions/constructor_portion.cpp b/ydb/core/tx/columnshard/engines/portions/constructor_portion.cpp index a190023c7e4..5300bb325e1 100644 --- a/ydb/core/tx/columnshard/engines/portions/constructor_portion.cpp +++ b/ydb/core/tx/columnshard/engines/portions/constructor_portion.cpp @@ -28,7 +28,6 @@ std::shared_ptr<TPortionInfo> TPortionInfoConstructor::Build() { result->PortionId = GetPortionIdVerified(); AFL_VERIFY(MinSnapshotDeprecated); - AFL_VERIFY(MinSnapshotDeprecated->Valid()); result->MinSnapshotDeprecated = *MinSnapshotDeprecated; if (RemoveSnapshot) { AFL_VERIFY(RemoveSnapshot->Valid()); diff --git a/ydb/core/tx/columnshard/engines/portions/constructor_portion.h b/ydb/core/tx/columnshard/engines/portions/constructor_portion.h index dbc67fbc0a5..2a7d4186453 100644 --- a/ydb/core/tx/columnshard/engines/portions/constructor_portion.h +++ b/ydb/core/tx/columnshard/engines/portions/constructor_portion.h @@ -122,15 +122,19 @@ public: std::shared_ptr<ISnapshotSchema> GetSchema(const TVersionedIndex& index) const; void SetMinSnapshotDeprecated(const TSnapshot& snap) { - Y_ABORT_UNLESS(snap.Valid()); MinSnapshotDeprecated = snap; } void SetSchemaVersion(const ui64 version) { - // AFL_VERIFY(version); + AFL_VERIFY(version); SchemaVersion = version; } + ui64 GetSchemaVersionVerified() const { + AFL_VERIFY(SchemaVersion); + return *SchemaVersion; + } + void SetShardingVersion(const ui64 version) { // AFL_VERIFY(version); ShardingVersion = version; diff --git a/ydb/core/tx/columnshard/engines/portions/portion_info.cpp b/ydb/core/tx/columnshard/engines/portions/portion_info.cpp index be0457b5a1f..350252c857e 100644 --- a/ydb/core/tx/columnshard/engines/portions/portion_info.cpp +++ b/ydb/core/tx/columnshard/engines/portions/portion_info.cpp @@ -59,7 +59,6 @@ void TPortionInfo::SerializeToProto(NKikimrColumnShardDataSharingProto::TPortion proto.SetPathId(PathId.GetRawValue()); proto.SetPortionId(PortionId); proto.SetSchemaVersion(GetSchemaVersionVerified()); - *proto.MutableMinSnapshotDeprecated() = MinSnapshotDeprecated.SerializeToProto(); if (!RemoveSnapshot.IsZero()) { *proto.MutableRemoveSnapshot() = RemoveSnapshot.SerializeToProto(); } @@ -74,12 +73,6 @@ TConclusionStatus TPortionInfo::DeserializeFromProto(const NKikimrColumnShardDat if (!SchemaVersion) { return TConclusionStatus::Fail("portion's schema version cannot been equals to zero"); } - { - auto parse = MinSnapshotDeprecated.DeserializeFromProto(proto.GetMinSnapshotDeprecated()); - if (!parse) { - return parse; - } - } if (proto.HasRemoveSnapshot()) { auto parse = RemoveSnapshot.DeserializeFromProto(proto.GetRemoveSnapshot()); if (!parse) { @@ -96,15 +89,6 @@ ISnapshotSchema::TPtr TPortionInfo::GetSchema(const TVersionedIndex& index) cons return schema; } -ISnapshotSchema::TPtr TPortionInfo::TSchemaCursor::GetSchema(const TPortionInfoConstructor& portion) { - if (!CurrentSchema || portion.GetMinSnapshotDeprecatedVerified() != LastSnapshot) { - CurrentSchema = portion.GetSchema(VersionedIndex); - LastSnapshot = portion.GetMinSnapshotDeprecatedVerified(); - } - AFL_VERIFY(!!CurrentSchema); - return CurrentSchema; -} - bool TPortionInfo::NeedShardingFilter(const TGranuleShardingInfo& shardingInfo) const { if (ShardingVersion && shardingInfo.GetSnapshotVersion() <= *ShardingVersion) { return false; diff --git a/ydb/core/tx/columnshard/engines/portions/portion_info.h b/ydb/core/tx/columnshard/engines/portions/portion_info.h index 624a981e58e..a2b395deefc 100644 --- a/ydb/core/tx/columnshard/engines/portions/portion_info.h +++ b/ydb/core/tx/columnshard/engines/portions/portion_info.h @@ -101,7 +101,6 @@ private: void FullValidation() const { AFL_VERIFY(PathId); AFL_VERIFY(PortionId); - AFL_VERIFY(MinSnapshotDeprecated.Valid()); AFL_VERIFY(SchemaVersion); Meta.FullValidation(); } @@ -120,7 +119,6 @@ public: virtual bool IsCommitted() const = 0; const TSnapshot& GetMinSnapshotDeprecated() const { - AFL_VERIFY(MinSnapshotDeprecated.Valid()); return MinSnapshotDeprecated; } @@ -321,7 +319,7 @@ public: } bool ValidSnapshotInfo() const { - return MinSnapshotDeprecated.Valid() && PathId && PortionId; + return SchemaVersion && PathId && PortionId; } TString DebugString(const bool withDetails = false) const; @@ -400,28 +398,6 @@ public: virtual const TSnapshot& RecordSnapshotMin(const std::optional<TSnapshot>& snapshotDefault = std::nullopt) const = 0; virtual const TSnapshot& RecordSnapshotMax(const std::optional<TSnapshot>& snapshotDefault = std::nullopt) const = 0; - class TSchemaCursor { - const NOlap::TVersionedIndex& VersionedIndex; - ISnapshotSchema::TPtr CurrentSchema; - TSnapshot LastSnapshot = TSnapshot::Zero(); - - public: - TSchemaCursor(const NOlap::TVersionedIndex& versionedIndex) - : VersionedIndex(versionedIndex) { - } - - ISnapshotSchema::TPtr GetSchema(const TPortionInfoConstructor& portion); - - ISnapshotSchema::TPtr GetSchema(const TPortionInfo& portion) { - if (!CurrentSchema || portion.MinSnapshotDeprecated != LastSnapshot) { - CurrentSchema = portion.GetSchema(VersionedIndex); - LastSnapshot = portion.MinSnapshotDeprecated; - } - AFL_VERIFY(!!CurrentSchema)("portion", portion.DebugString()); - return CurrentSchema; - } - }; - ISnapshotSchema::TPtr GetSchema(const TVersionedIndex& index) const; ui32 GetRecordsCount() const { diff --git a/ydb/core/tx/columnshard/engines/portions/read_with_blobs.cpp b/ydb/core/tx/columnshard/engines/portions/read_with_blobs.cpp index 6c040921608..bcdcb1ff10d 100644 --- a/ydb/core/tx/columnshard/engines/portions/read_with_blobs.cpp +++ b/ydb/core/tx/columnshard/engines/portions/read_with_blobs.cpp @@ -117,7 +117,7 @@ std::optional<TWritePortionInfoWithBlobsResult> TReadPortionInfoWithBlobs::SyncP } TPortionAccessorConstructor constructor = TPortionAccessorConstructor::BuildForRewriteBlobs(source.PortionInfo.GetPortionInfo()); - constructor.MutablePortionConstructor().SetMinSnapshotDeprecated(to->GetSnapshot()); + constructor.MutablePortionConstructor().SetMinSnapshotDeprecated(TSnapshot(0, 0)); constructor.MutablePortionConstructor().SetSchemaVersion(to->GetVersion()); constructor.MutablePortionConstructor().MutableMeta().ResetTierName(targetTier); diff --git a/ydb/core/tx/columnshard/engines/portions/write_with_blobs.cpp b/ydb/core/tx/columnshard/engines/portions/write_with_blobs.cpp index 8e0141913d7..1c440c9f62c 100644 --- a/ydb/core/tx/columnshard/engines/portions/write_with_blobs.cpp +++ b/ydb/core/tx/columnshard/engines/portions/write_with_blobs.cpp @@ -30,7 +30,7 @@ void TWritePortionInfoWithBlobsResult::TBlobInfo::RegisterBlobId(TWritePortionIn TWritePortionInfoWithBlobsConstructor TWritePortionInfoWithBlobsConstructor::BuildByBlobs(std::vector<TSplittedBlob>&& chunks, const THashMap<ui32, std::shared_ptr<IPortionDataChunk>>& inplaceChunks, const TInternalPathId granule, const ui64 schemaVersion, - const TSnapshot& snapshot, const std::shared_ptr<IStoragesManager>& operators, const EPortionType type) { + const TSnapshot& /*snapshot*/, const std::shared_ptr<IStoragesManager>& operators, const EPortionType type) { TPortionAccessorConstructor constructor = [&]() { switch (type) { case EPortionType::Written: @@ -39,7 +39,7 @@ TWritePortionInfoWithBlobsConstructor TWritePortionInfoWithBlobsConstructor::Bui return TPortionAccessorConstructor(std::make_unique<TCompactedPortionInfoConstructor>(granule)); } }(); - constructor.MutablePortionConstructor().SetMinSnapshotDeprecated(snapshot); + constructor.MutablePortionConstructor().SetMinSnapshotDeprecated(TSnapshot(0, 0)); constructor.MutablePortionConstructor().SetSchemaVersion(schemaVersion); return BuildByBlobs(std::move(chunks), inplaceChunks, std::move(constructor), operators); } diff --git a/ydb/core/tx/columnshard/engines/storage/granule/granule.cpp b/ydb/core/tx/columnshard/engines/storage/granule/granule.cpp index 15e2be8e9d8..109b018667e 100644 --- a/ydb/core/tx/columnshard/engines/storage/granule/granule.cpp +++ b/ydb/core/tx/columnshard/engines/storage/granule/granule.cpp @@ -245,7 +245,6 @@ bool TGranuleMeta::TestingLoad(IDbWrapper& db, const TVersionedIndex& versionedI } { - TPortionInfo::TSchemaCursor schema(versionedIndex); if (!db.LoadColumns(PathId, [&](TColumnChunkLoadContextV2&& loadContext) { auto* constructor = constructors.GetConstructorVerified(loadContext.GetPortionId()); for (auto&& i : loadContext.BuildRecordsV1()) { diff --git a/ydb/core/tx/columnshard/engines/storage/granule/stages.cpp b/ydb/core/tx/columnshard/engines/storage/granule/stages.cpp index bd38f91069d..48c7963c26c 100644 --- a/ydb/core/tx/columnshard/engines/storage/granule/stages.cpp +++ b/ydb/core/tx/columnshard/engines/storage/granule/stages.cpp @@ -29,7 +29,6 @@ bool TGranuleOnlyPortionsReader::DoPrecharge(NTabletFlatExecutor::TTransactionCo bool TGranuleColumnsReader::DoExecute(NTabletFlatExecutor::TTransactionContext& txc, const TActorContext& /*ctx*/) { TDbWrapper db(txc.DB, &*DsGroupSelector); - TPortionInfo::TSchemaCursor schema(*VersionedIndex); Context->ClearRecords(); return db.LoadColumns(Self->GetPathId(), [&](TColumnChunkLoadContextV2&& loadContext) { Context->Add(std::move(loadContext)); diff --git a/ydb/core/tx/columnshard/engines/ut/ut_logs_engine.cpp b/ydb/core/tx/columnshard/engines/ut/ut_logs_engine.cpp index 3cf2eaf180e..1dfd474bf4d 100644 --- a/ydb/core/tx/columnshard/engines/ut/ut_logs_engine.cpp +++ b/ydb/core/tx/columnshard/engines/ut/ut_logs_engine.cpp @@ -138,7 +138,7 @@ public: Y_ABORT_UNLESS(portion.GetPathId() == it->second.MutablePortionConstructor().GetPathId() && portion.GetPortionId() == it->second.MutablePortionConstructor().GetPortionIdVerified()); } - it->second.MutablePortionConstructor().SetMinSnapshotDeprecated(portion.GetMinSnapshotDeprecated()); + it->second.MutablePortionConstructor().SetSchemaVersion(portion.GetSchemaVersionVerified()); if (portion.HasRemoveSnapshot()) { if (!it->second.MutablePortionConstructor().HasRemoveSnapshot()) { it->second.MutablePortionConstructor().SetRemoveSnapshot(portion.GetRemoveSnapshotVerified()); diff --git a/ydb/core/tx/columnshard/normalizer/abstract/abstract.cpp b/ydb/core/tx/columnshard/normalizer/abstract/abstract.cpp index 7ca284d9d8d..4bb3ce6a641 100644 --- a/ydb/core/tx/columnshard/normalizer/abstract/abstract.cpp +++ b/ydb/core/tx/columnshard/normalizer/abstract/abstract.cpp @@ -100,7 +100,9 @@ void TNormalizationController::InitNormalizers(const TInitContext& ctx) { if (::ToString(nType).StartsWith("Deprecated")) { continue; } - auto normalizer = RegisterNormalizer(std::shared_ptr<INormalizerComponent>(INormalizerComponent::TFactory::Construct(::ToString(nType), ctx))); + auto component = INormalizerComponent::TFactory::MakeHolder(::ToString(nType), ctx); + AFL_VERIFY(component)("class_name", ::ToString(nType)); + auto normalizer = RegisterNormalizer(std::shared_ptr<INormalizerComponent>(component.Release())); AFL_VERIFY(normalizer->GetEnumSequentialIdVerified() == nType); AFL_VERIFY(lastRegisteredNormalizer <= nType)("current", ToString(nType))("last", ToString(lastRegisteredNormalizer)); lastRegisteredNormalizer = nType; diff --git a/ydb/core/tx/columnshard/normalizer/abstract/abstract.h b/ydb/core/tx/columnshard/normalizer/abstract/abstract.h index 636177db8f7..8af30bb63f5 100644 --- a/ydb/core/tx/columnshard/normalizer/abstract/abstract.h +++ b/ydb/core/tx/columnshard/normalizer/abstract/abstract.h @@ -51,6 +51,7 @@ public: } }; +// DONT REMOVE AND DONT CHANGE PLACES! PERSISTENT! ADD Deprecated PREFIX FOR REMOVED NORMALIZER enum class ENormalizerSequentialId : ui32 { Granules = 1, Chunks, @@ -61,10 +62,10 @@ enum class ENormalizerSequentialId : ui32 { DeprecatedEmptyPortionsCleaner, CleanInsertionDedup, GCCountersNormalizer, - RestorePortionFromChunks, + DeprecatedRestorePortionFromChunks, SyncPortionFromChunks, DeprecatedRestoreV1Chunks, - SyncMinSnapshotFromChunks, + DeprecatedSyncMinSnapshotFromChunks, DeprecatedRestoreV1Chunks_V1, RestoreV1Chunks_V2, RestoreV2Chunks, diff --git a/ydb/core/tx/columnshard/normalizer/portion/normalizer.cpp b/ydb/core/tx/columnshard/normalizer/portion/normalizer.cpp index 0408e14b7f6..0005d193f67 100644 --- a/ydb/core/tx/columnshard/normalizer/portion/normalizer.cpp +++ b/ydb/core/tx/columnshard/normalizer/portion/normalizer.cpp @@ -57,9 +57,9 @@ TConclusion<std::vector<INormalizerTask::TPtr>> TPortionsNormalizerBase::DoInit( return conclusion; } } - TPortionInfo::TSchemaCursor schema(tablesManager.GetPrimaryIndexSafe().GetVersionedIndex()); for (auto&& [_, p] : portions) { - (*schemas)[p.GetPortionConstructor().GetPortionIdVerified()] = schema.GetSchema(p.GetPortionConstructor()); + (*schemas)[p.GetPortionConstructor().GetPortionIdVerified()] = + tablesManager.GetPrimaryIndexSafe().GetVersionedIndex().GetSchemaVerified(p.GetPortionConstructor().GetSchemaVersionVerified()); } std::vector<TPortionDataAccessor> package; @@ -116,7 +116,6 @@ TConclusionStatus TPortionsNormalizerBase::InitColumns( return TConclusionStatus::Fail("Not ready"); } - TPortionInfo::TSchemaCursor schema(tablesManager.GetPrimaryIndexSafe().GetVersionedIndex()); auto initPortion = [&](TColumnChunkLoadContextV1&& loadContext) { if (!columnsFilter.empty() && !columnsFilter.contains(loadContext.GetAddress().GetColumnId())) { return; diff --git a/ydb/core/tx/columnshard/normalizer/portion/restore_portion_from_chunks.cpp b/ydb/core/tx/columnshard/normalizer/portion/restore_portion_from_chunks.cpp deleted file mode 100644 index 480e27ef055..00000000000 --- a/ydb/core/tx/columnshard/normalizer/portion/restore_portion_from_chunks.cpp +++ /dev/null @@ -1,148 +0,0 @@ -#include "normalizer.h" -#include "restore_portion_from_chunks.h" - -#include <ydb/core/formats/arrow/size_calcer.h> -#include <ydb/core/tx/columnshard/engines/portions/data_accessor.h> -#include <ydb/core/tx/columnshard/engines/portions/portion_info.h> -#include <ydb/core/tx/columnshard/tables_manager.h> - -namespace NKikimr::NOlap::NRestorePortionsFromChunks { - -class TPatchItem { -private: - YDB_READONLY(ui32, SchemaVersion, 0); - TColumnChunkLoadContext ChunkInfo; - -public: - const TColumnChunkLoadContext& GetChunkInfo() const { - return ChunkInfo; - } - - TPatchItem(const ui32 schemaVersion, TColumnChunkLoadContext&& chunkInfo) - : SchemaVersion(schemaVersion) - , ChunkInfo(std::move(chunkInfo)) { - } -}; - -class TChanges: public INormalizerChanges { -private: - std::vector<TPatchItem> Patches; - -public: - TChanges(std::vector<TPatchItem>&& patches) - : Patches(std::move(patches)) { - } - virtual bool ApplyOnExecute(NTabletFlatExecutor::TTransactionContext& txc, const TNormalizationController&) const override { - using namespace NColumnShard; - NIceDb::TNiceDb db(txc.DB); - for (auto&& i : Patches) { - AFL_VERIFY(i.GetChunkInfo().GetMetaProto().HasPortionMeta()); - auto metaProtoString = i.GetChunkInfo().GetMetaProto().GetPortionMeta().SerializeAsString(); - using IndexPortions = NColumnShard::Schema::IndexPortions; - const auto removeSnapshot = i.GetChunkInfo().GetRemoveSnapshot(); - const auto minSnapshotDeprecated = i.GetChunkInfo().GetMinSnapshotDeprecated(); - db.Table<IndexPortions>() - .Key(i.GetChunkInfo().GetPathId().GetRawValue(), i.GetChunkInfo().GetPortionId()) - .Update(NIceDb::TUpdate<IndexPortions::SchemaVersion>(i.GetSchemaVersion()), NIceDb::TUpdate<IndexPortions::ShardingVersion>(0), - NIceDb::TUpdate<IndexPortions::CommitPlanStep>(0), NIceDb::TUpdate<IndexPortions::CommitTxId>(0), - NIceDb::TUpdate<IndexPortions::InsertWriteId>(0), NIceDb::TUpdate<IndexPortions::XPlanStep>(removeSnapshot.GetPlanStep()), - NIceDb::TUpdate<IndexPortions::XTxId>(removeSnapshot.GetTxId()), - NIceDb::TUpdate<IndexPortions::MinSnapshotPlanStep>(minSnapshotDeprecated.GetPlanStep()), - NIceDb::TUpdate<IndexPortions::MinSnapshotTxId>(minSnapshotDeprecated.GetTxId()), - NIceDb::TUpdate<IndexPortions::Metadata>(metaProtoString)); - } - - return true; - } - - virtual ui64 GetSize() const override { - return Patches.size(); - } -}; - -TConclusion<std::vector<INormalizerTask::TPtr>> TNormalizer::DoInit( - const TNormalizationController& controller, NTabletFlatExecutor::TTransactionContext& txc) { - using namespace NColumnShard; - NIceDb::TNiceDb db(txc.DB); - - bool ready = true; - ready = ready & Schema::Precharge<Schema::IndexColumns>(db, txc.DB.GetScheme()); - ready = ready & Schema::Precharge<Schema::IndexPortions>(db, txc.DB.GetScheme()); - if (!ready) { - return TConclusionStatus::Fail("Not ready"); - } - - TTablesManager tablesManager(controller.GetStoragesManager(), std::make_shared<NDataAccessorControl::TLocalManager>(nullptr), - std::make_shared<TSchemaObjectsCache>(), std::make_shared<TPortionIndexStats>(), 0); - if (!tablesManager.InitFromDB(db)) { - ACFL_TRACE("normalizer", "TChunksNormalizer")("error", "can't initialize tables manager"); - return TConclusionStatus::Fail("Can't load index"); - } - - THashMap<ui64, TPortionLoadContext> dbPortions; - - { - auto rowset = db.Table<Schema::IndexPortions>().Select(); - if (!rowset.IsReady()) { - return TConclusionStatus::Fail("Not ready"); - } - - while (!rowset.EndOfSet()) { - TPortionLoadContext portion(rowset); - AFL_VERIFY(dbPortions.emplace(portion.GetPortionId(), portion).second); - - if (!rowset.Next()) { - return TConclusionStatus::Fail("Not ready"); - } - } - } - - THashMap<ui64, TColumnChunkLoadContext> portionsToWrite; - { - auto rowset = db.Table<Schema::IndexColumns>().Select(); - if (!rowset.IsReady()) { - return TConclusionStatus::Fail("Not ready"); - } - - THashSet<ui64> portionsToRestore; - while (!rowset.EndOfSet()) { - TColumnChunkLoadContext chunk(rowset, &DsGroupSelector); - if (!dbPortions.contains(chunk.GetPortionId())) { - portionsToRestore.emplace(chunk.GetPortionId()); - if (chunk.GetMetaProto().HasPortionMeta()) { - AFL_VERIFY(portionsToWrite.emplace(chunk.GetPortionId(), chunk).second); - } - } - - if (!rowset.Next()) { - return TConclusionStatus::Fail("Not ready"); - } - } - AFL_VERIFY(portionsToRestore.size() == portionsToWrite.size()); - } - - std::vector<INormalizerTask::TPtr> tasks; - if (portionsToWrite.empty()) { - return tasks; - } - - std::vector<TPatchItem> package; - - for (auto&& [_, chunkWithPortionData] : portionsToWrite) { - package.emplace_back( - tablesManager.GetPrimaryIndexSafe().GetVersionedIndex().GetSchemaVerified(chunkWithPortionData.GetMinSnapshotDeprecated())->GetVersion(), - std::move(chunkWithPortionData)); - if (package.size() == 100) { - std::vector<TPatchItem> local; - local.swap(package); - tasks.emplace_back(std::make_shared<TTrivialNormalizerTask>(std::make_shared<TChanges>(std::move(local)))); - } - } - - if (package.size() > 0) { - tasks.emplace_back(std::make_shared<TTrivialNormalizerTask>(std::make_shared<TChanges>(std::move(package)))); - } - return tasks; -} - -} // namespace NKikimr::NOlap::NRestorePortionsFromChunks diff --git a/ydb/core/tx/columnshard/normalizer/portion/restore_portion_from_chunks.h b/ydb/core/tx/columnshard/normalizer/portion/restore_portion_from_chunks.h deleted file mode 100644 index 110b0e35a7c..00000000000 --- a/ydb/core/tx/columnshard/normalizer/portion/restore_portion_from_chunks.h +++ /dev/null @@ -1,47 +0,0 @@ -#pragma once - -#include <ydb/core/tx/columnshard/columnshard_schema.h> -#include <ydb/core/tx/columnshard/defs.h> -#include <ydb/core/tx/columnshard/normalizer/abstract/abstract.h> - -namespace NKikimr::NColumnShard { -class TTablesManager; -} - -namespace NKikimr::NOlap::NRestorePortionsFromChunks { - -class TNormalizer: public TNormalizationController::INormalizerComponent { -private: - using TBase = TNormalizationController::INormalizerComponent; - -public: - static TString GetClassNameStatic() { - return ::ToString(ENormalizerSequentialId::RestorePortionFromChunks); - } - - virtual std::optional<ENormalizerSequentialId> DoGetEnumSequentialId() const override { - return ENormalizerSequentialId::RestorePortionFromChunks; - } - - virtual TString GetClassName() const override { - return GetClassNameStatic(); - } - - class TNormalizerResult; - - static inline INormalizerComponent::TFactory::TRegistrator<TNormalizer> Registrator = - INormalizerComponent::TFactory::TRegistrator<TNormalizer>(GetClassNameStatic()); - -public: - TNormalizer(const TNormalizationController::TInitContext& info) - : TBase(info) - , DsGroupSelector(info.GetStorageInfo()) { - } - - virtual TConclusion<std::vector<INormalizerTask::TPtr>> DoInit( - const TNormalizationController& controller, NTabletFlatExecutor::TTransactionContext& txc) override; - -private: - NColumnShard::TBlobGroupSelector DsGroupSelector; -}; -} // namespace NKikimr::NOlap::NRestorePortionsFromChunks diff --git a/ydb/core/tx/columnshard/normalizer/portion/snapshot_from_chunks.cpp b/ydb/core/tx/columnshard/normalizer/portion/snapshot_from_chunks.cpp deleted file mode 100644 index 1423109e9d2..00000000000 --- a/ydb/core/tx/columnshard/normalizer/portion/snapshot_from_chunks.cpp +++ /dev/null @@ -1,138 +0,0 @@ -#include <ydb/core/tx/columnshard/common/path_id.h> -#include "snapshot_from_chunks.h" -#include "normalizer.h" - -#include <ydb/core/formats/arrow/size_calcer.h> -#include <ydb/core/tx/columnshard/engines/portions/data_accessor.h> -#include <ydb/core/tx/columnshard/engines/portions/portion_info.h> -#include <ydb/core/tx/columnshard/tables_manager.h> - -namespace NKikimr::NOlap::NSyncMinSnapshotFromChunks { - -class TPatchItem { -private: - TPortionLoadContext PortionInfo; - YDB_READONLY(NOlap::TSnapshot, Snapshot, NOlap::TSnapshot::Zero()); - -public: - const TPortionLoadContext& GetPortionInfo() const { - return PortionInfo; - } - - TPatchItem(TPortionLoadContext&& portion, const NOlap::TSnapshot& snapshot) - : PortionInfo(std::move(portion)) - , Snapshot(snapshot) { - } -}; - -class TChanges: public INormalizerChanges { -private: - std::vector<TPatchItem> Patches; - -public: - TChanges(std::vector<TPatchItem>&& patches) - : Patches(std::move(patches)) { - } - virtual bool ApplyOnExecute(NTabletFlatExecutor::TTransactionContext& txc, const TNormalizationController&) const override { - using namespace NColumnShard; - NIceDb::TNiceDb db(txc.DB); - for (auto&& i : Patches) { - db.Table<Schema::IndexPortions>() - .Key(i.GetPortionInfo().GetPathId().GetRawValue(), i.GetPortionInfo().GetPortionId()) - .Update(NIceDb::TUpdate<Schema::IndexPortions::MinSnapshotPlanStep>(i.GetSnapshot().GetPlanStep()), - NIceDb::TUpdate<Schema::IndexPortions::MinSnapshotTxId>(i.GetSnapshot().GetTxId()) - ); - } - - return true; - } - - virtual ui64 GetSize() const override { - return Patches.size(); - } - -}; - -TConclusion<std::vector<INormalizerTask::TPtr>> TNormalizer::DoInit( - const TNormalizationController& /*controller*/, NTabletFlatExecutor::TTransactionContext& txc) { - using namespace NColumnShard; - NIceDb::TNiceDb db(txc.DB); - - bool ready = true; - ready = ready & Schema::Precharge<Schema::IndexPortions>(db, txc.DB.GetScheme()); - ready = ready & Schema::Precharge<Schema::IndexColumns>(db, txc.DB.GetScheme()); - if (!ready) { - return TConclusionStatus::Fail("Not ready"); - } - - THashMap<ui64, TPortionLoadContext> dbPortions; - THashMap<ui64, NOlap::TSnapshot> initSnapshot; - - { - auto rowset = db.Table<Schema::IndexPortions>().Select(); - if (!rowset.IsReady()) { - return TConclusionStatus::Fail("Not ready"); - } - - while (!rowset.EndOfSet()) { - TPortionLoadContext portion(rowset); - if (!portion.GetDeprecatedMinSnapshot()) { - AFL_VERIFY(dbPortions.emplace(portion.GetPortionId(), portion).second); - } - - if (!rowset.Next()) { - return TConclusionStatus::Fail("Not ready"); - } - } - } - - { - auto rowset = db.Table<Schema::IndexColumns>().Select(); - if (!rowset.IsReady()) { - return TConclusionStatus::Fail("Not ready"); - } - - while (!rowset.EndOfSet()) { - TColumnChunkLoadContext chunk(rowset, &DsGroupSelector); - const ui64 portionId = chunk.GetPortionId(); - if (dbPortions.contains(portionId)) { - auto it = initSnapshot.find(portionId); - if (it == initSnapshot.end()) { - initSnapshot.emplace(portionId, chunk.GetMinSnapshotDeprecated()); - } else { - AFL_VERIFY(it->second == chunk.GetMinSnapshotDeprecated()); - } - } - - if (!rowset.Next()) { - return TConclusionStatus::Fail("Not ready"); - } - } - } - AFL_VERIFY(dbPortions.size() == initSnapshot.size())("portions", dbPortions.size())("records", initSnapshot.size()); - - std::vector<INormalizerTask::TPtr> tasks; - if (dbPortions.empty()) { - return tasks; - } - - std::vector<TPatchItem> package; - - for (auto&& [portionId, portion] : dbPortions) { - auto it = initSnapshot.find(portionId); - AFL_VERIFY(it != initSnapshot.end()); - package.emplace_back(std::move(portion), it->second); - if (package.size() == 100) { - std::vector<TPatchItem> local; - local.swap(package); - tasks.emplace_back(std::make_shared<TTrivialNormalizerTask>(std::make_shared<TChanges>(std::move(local)))); - } - } - - if (package.size() > 0) { - tasks.emplace_back(std::make_shared<TTrivialNormalizerTask>(std::make_shared<TChanges>(std::move(package)))); - } - return tasks; -} - -} // namespace NKikimr::NOlap::NChunksActualization diff --git a/ydb/core/tx/columnshard/normalizer/portion/snapshot_from_chunks.h b/ydb/core/tx/columnshard/normalizer/portion/snapshot_from_chunks.h deleted file mode 100644 index 5272bd1dadc..00000000000 --- a/ydb/core/tx/columnshard/normalizer/portion/snapshot_from_chunks.h +++ /dev/null @@ -1,47 +0,0 @@ -#pragma once - -#include <ydb/core/tx/columnshard/columnshard_schema.h> -#include <ydb/core/tx/columnshard/defs.h> -#include <ydb/core/tx/columnshard/normalizer/abstract/abstract.h> - -namespace NKikimr::NColumnShard { -class TTablesManager; -} - -namespace NKikimr::NOlap::NSyncMinSnapshotFromChunks { - -class TNormalizer: public TNormalizationController::INormalizerComponent { -private: - using TBase = TNormalizationController::INormalizerComponent; - -public: - static TString GetClassNameStatic() { - return ::ToString(ENormalizerSequentialId::SyncMinSnapshotFromChunks); - } - - virtual std::optional<ENormalizerSequentialId> DoGetEnumSequentialId() const override { - return ENormalizerSequentialId::SyncMinSnapshotFromChunks; - } - - virtual TString GetClassName() const override { - return GetClassNameStatic(); - } - - class TNormalizerResult; - - static inline INormalizerComponent::TFactory::TRegistrator<TNormalizer> Registrator = - INormalizerComponent::TFactory::TRegistrator<TNormalizer>(GetClassNameStatic()); - -public: - TNormalizer(const TNormalizationController::TInitContext& info) - : TBase(info) - , DsGroupSelector(info.GetStorageInfo()) { - } - - virtual TConclusion<std::vector<INormalizerTask::TPtr>> DoInit( - const TNormalizationController& controller, NTabletFlatExecutor::TTransactionContext& txc) override; - -private: - NColumnShard::TBlobGroupSelector DsGroupSelector; -}; -} // namespace NKikimr::NOlap::NSyncMinSnapshotFromChunks diff --git a/ydb/core/tx/columnshard/normalizer/portion/ya.make b/ydb/core/tx/columnshard/normalizer/portion/ya.make index e7eaf752bad..28b8ad4da38 100644 --- a/ydb/core/tx/columnshard/normalizer/portion/ya.make +++ b/ydb/core/tx/columnshard/normalizer/portion/ya.make @@ -9,10 +9,8 @@ SRCS( GLOBAL broken_blobs.cpp GLOBAL special_cleaner.cpp GLOBAL chunks_actualization.cpp - GLOBAL restore_portion_from_chunks.cpp GLOBAL restore_v1_chunks.cpp GLOBAL restore_v2_chunks.cpp - GLOBAL snapshot_from_chunks.cpp GLOBAL leaked_blobs.cpp ) |