aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov333 <ivanmorozov@ydb.tech>2025-05-04 16:53:49 +0300
committerGitHub <noreply@github.com>2025-05-04 16:53:49 +0300
commitdd4139765d39e76eb3b9af25f52a9f929f125d0d (patch)
tree4effa307a5ebb107dcf4a104779c81e343b84669
parent6b6327610ba481e9335a036c6d44815e4cd6ba97 (diff)
downloadydb-dd4139765d39e76eb3b9af25f52a9f929f125d0d.tar.gz
dont fill deprecated field (#17976)
Co-authored-by: ivanmorozov333 <imorozov333@ya.ru>
-rw-r--r--ydb/core/tx/columnshard/data_sharing/protos/data.proto1
-rw-r--r--ydb/core/tx/columnshard/engines/db_wrapper.cpp6
-rw-r--r--ydb/core/tx/columnshard/engines/portions/constructor_portion.cpp1
-rw-r--r--ydb/core/tx/columnshard/engines/portions/constructor_portion.h8
-rw-r--r--ydb/core/tx/columnshard/engines/portions/portion_info.cpp16
-rw-r--r--ydb/core/tx/columnshard/engines/portions/portion_info.h26
-rw-r--r--ydb/core/tx/columnshard/engines/portions/read_with_blobs.cpp2
-rw-r--r--ydb/core/tx/columnshard/engines/portions/write_with_blobs.cpp4
-rw-r--r--ydb/core/tx/columnshard/engines/storage/granule/granule.cpp1
-rw-r--r--ydb/core/tx/columnshard/engines/storage/granule/stages.cpp1
-rw-r--r--ydb/core/tx/columnshard/engines/ut/ut_logs_engine.cpp2
-rw-r--r--ydb/core/tx/columnshard/normalizer/abstract/abstract.cpp4
-rw-r--r--ydb/core/tx/columnshard/normalizer/abstract/abstract.h5
-rw-r--r--ydb/core/tx/columnshard/normalizer/portion/normalizer.cpp5
-rw-r--r--ydb/core/tx/columnshard/normalizer/portion/restore_portion_from_chunks.cpp148
-rw-r--r--ydb/core/tx/columnshard/normalizer/portion/restore_portion_from_chunks.h47
-rw-r--r--ydb/core/tx/columnshard/normalizer/portion/snapshot_from_chunks.cpp138
-rw-r--r--ydb/core/tx/columnshard/normalizer/portion/snapshot_from_chunks.h47
-rw-r--r--ydb/core/tx/columnshard/normalizer/portion/ya.make2
19 files changed, 21 insertions, 443 deletions
diff --git a/ydb/core/tx/columnshard/data_sharing/protos/data.proto b/ydb/core/tx/columnshard/data_sharing/protos/data.proto
index 6ead2d5241e..257ca28ce84 100644
--- a/ydb/core/tx/columnshard/data_sharing/protos/data.proto
+++ b/ydb/core/tx/columnshard/data_sharing/protos/data.proto
@@ -29,7 +29,6 @@ message TIndexChunk {
message TPortionInfo {
optional uint64 PathId = 1;
optional uint64 PortionId = 2;
- optional NKikimrColumnShardProto.TSnapshot MinSnapshotDeprecated = 3;
optional NKikimrColumnShardProto.TSnapshot RemoveSnapshot = 4;
optional NKikimrTxColumnShard.TIndexPortionMeta Meta = 5;
repeated TColumnRecord Records = 6;
diff --git a/ydb/core/tx/columnshard/engines/db_wrapper.cpp b/ydb/core/tx/columnshard/engines/db_wrapper.cpp
index 06f8e9898e5..07abcff14ad 100644
--- a/ydb/core/tx/columnshard/engines/db_wrapper.cpp
+++ b/ydb/core/tx/columnshard/engines/db_wrapper.cpp
@@ -162,10 +162,8 @@ bool TDbWrapper::LoadPortions(const std::optional<TInternalPathId> pathId,
portion->SetShardingVersion(rowset.template GetValue<IndexPortions::ShardingVersion>());
}
portion->SetRemoveSnapshot(rowset.template GetValue<IndexPortions::XPlanStep>(), rowset.template GetValue<IndexPortions::XTxId>());
- if (rowset.template GetValue<IndexPortions::MinSnapshotPlanStep>()) {
- portion->SetMinSnapshotDeprecated(TSnapshot(
- rowset.template GetValue<IndexPortions::MinSnapshotPlanStep>(), rowset.template GetValue<IndexPortions::MinSnapshotTxId>()));
- }
+ portion->SetMinSnapshotDeprecated(TSnapshot(
+ rowset.template GetValue<IndexPortions::MinSnapshotPlanStep>(), rowset.template GetValue<IndexPortions::MinSnapshotTxId>()));
NKikimrTxColumnShard::TIndexPortionMeta metaProto;
const TString metadata = rowset.template GetValue<NColumnShard::Schema::IndexPortions::Metadata>();
diff --git a/ydb/core/tx/columnshard/engines/portions/constructor_portion.cpp b/ydb/core/tx/columnshard/engines/portions/constructor_portion.cpp
index a190023c7e4..5300bb325e1 100644
--- a/ydb/core/tx/columnshard/engines/portions/constructor_portion.cpp
+++ b/ydb/core/tx/columnshard/engines/portions/constructor_portion.cpp
@@ -28,7 +28,6 @@ std::shared_ptr<TPortionInfo> TPortionInfoConstructor::Build() {
result->PortionId = GetPortionIdVerified();
AFL_VERIFY(MinSnapshotDeprecated);
- AFL_VERIFY(MinSnapshotDeprecated->Valid());
result->MinSnapshotDeprecated = *MinSnapshotDeprecated;
if (RemoveSnapshot) {
AFL_VERIFY(RemoveSnapshot->Valid());
diff --git a/ydb/core/tx/columnshard/engines/portions/constructor_portion.h b/ydb/core/tx/columnshard/engines/portions/constructor_portion.h
index dbc67fbc0a5..2a7d4186453 100644
--- a/ydb/core/tx/columnshard/engines/portions/constructor_portion.h
+++ b/ydb/core/tx/columnshard/engines/portions/constructor_portion.h
@@ -122,15 +122,19 @@ public:
std::shared_ptr<ISnapshotSchema> GetSchema(const TVersionedIndex& index) const;
void SetMinSnapshotDeprecated(const TSnapshot& snap) {
- Y_ABORT_UNLESS(snap.Valid());
MinSnapshotDeprecated = snap;
}
void SetSchemaVersion(const ui64 version) {
- // AFL_VERIFY(version);
+ AFL_VERIFY(version);
SchemaVersion = version;
}
+ ui64 GetSchemaVersionVerified() const {
+ AFL_VERIFY(SchemaVersion);
+ return *SchemaVersion;
+ }
+
void SetShardingVersion(const ui64 version) {
// AFL_VERIFY(version);
ShardingVersion = version;
diff --git a/ydb/core/tx/columnshard/engines/portions/portion_info.cpp b/ydb/core/tx/columnshard/engines/portions/portion_info.cpp
index be0457b5a1f..350252c857e 100644
--- a/ydb/core/tx/columnshard/engines/portions/portion_info.cpp
+++ b/ydb/core/tx/columnshard/engines/portions/portion_info.cpp
@@ -59,7 +59,6 @@ void TPortionInfo::SerializeToProto(NKikimrColumnShardDataSharingProto::TPortion
proto.SetPathId(PathId.GetRawValue());
proto.SetPortionId(PortionId);
proto.SetSchemaVersion(GetSchemaVersionVerified());
- *proto.MutableMinSnapshotDeprecated() = MinSnapshotDeprecated.SerializeToProto();
if (!RemoveSnapshot.IsZero()) {
*proto.MutableRemoveSnapshot() = RemoveSnapshot.SerializeToProto();
}
@@ -74,12 +73,6 @@ TConclusionStatus TPortionInfo::DeserializeFromProto(const NKikimrColumnShardDat
if (!SchemaVersion) {
return TConclusionStatus::Fail("portion's schema version cannot been equals to zero");
}
- {
- auto parse = MinSnapshotDeprecated.DeserializeFromProto(proto.GetMinSnapshotDeprecated());
- if (!parse) {
- return parse;
- }
- }
if (proto.HasRemoveSnapshot()) {
auto parse = RemoveSnapshot.DeserializeFromProto(proto.GetRemoveSnapshot());
if (!parse) {
@@ -96,15 +89,6 @@ ISnapshotSchema::TPtr TPortionInfo::GetSchema(const TVersionedIndex& index) cons
return schema;
}
-ISnapshotSchema::TPtr TPortionInfo::TSchemaCursor::GetSchema(const TPortionInfoConstructor& portion) {
- if (!CurrentSchema || portion.GetMinSnapshotDeprecatedVerified() != LastSnapshot) {
- CurrentSchema = portion.GetSchema(VersionedIndex);
- LastSnapshot = portion.GetMinSnapshotDeprecatedVerified();
- }
- AFL_VERIFY(!!CurrentSchema);
- return CurrentSchema;
-}
-
bool TPortionInfo::NeedShardingFilter(const TGranuleShardingInfo& shardingInfo) const {
if (ShardingVersion && shardingInfo.GetSnapshotVersion() <= *ShardingVersion) {
return false;
diff --git a/ydb/core/tx/columnshard/engines/portions/portion_info.h b/ydb/core/tx/columnshard/engines/portions/portion_info.h
index 624a981e58e..a2b395deefc 100644
--- a/ydb/core/tx/columnshard/engines/portions/portion_info.h
+++ b/ydb/core/tx/columnshard/engines/portions/portion_info.h
@@ -101,7 +101,6 @@ private:
void FullValidation() const {
AFL_VERIFY(PathId);
AFL_VERIFY(PortionId);
- AFL_VERIFY(MinSnapshotDeprecated.Valid());
AFL_VERIFY(SchemaVersion);
Meta.FullValidation();
}
@@ -120,7 +119,6 @@ public:
virtual bool IsCommitted() const = 0;
const TSnapshot& GetMinSnapshotDeprecated() const {
- AFL_VERIFY(MinSnapshotDeprecated.Valid());
return MinSnapshotDeprecated;
}
@@ -321,7 +319,7 @@ public:
}
bool ValidSnapshotInfo() const {
- return MinSnapshotDeprecated.Valid() && PathId && PortionId;
+ return SchemaVersion && PathId && PortionId;
}
TString DebugString(const bool withDetails = false) const;
@@ -400,28 +398,6 @@ public:
virtual const TSnapshot& RecordSnapshotMin(const std::optional<TSnapshot>& snapshotDefault = std::nullopt) const = 0;
virtual const TSnapshot& RecordSnapshotMax(const std::optional<TSnapshot>& snapshotDefault = std::nullopt) const = 0;
- class TSchemaCursor {
- const NOlap::TVersionedIndex& VersionedIndex;
- ISnapshotSchema::TPtr CurrentSchema;
- TSnapshot LastSnapshot = TSnapshot::Zero();
-
- public:
- TSchemaCursor(const NOlap::TVersionedIndex& versionedIndex)
- : VersionedIndex(versionedIndex) {
- }
-
- ISnapshotSchema::TPtr GetSchema(const TPortionInfoConstructor& portion);
-
- ISnapshotSchema::TPtr GetSchema(const TPortionInfo& portion) {
- if (!CurrentSchema || portion.MinSnapshotDeprecated != LastSnapshot) {
- CurrentSchema = portion.GetSchema(VersionedIndex);
- LastSnapshot = portion.MinSnapshotDeprecated;
- }
- AFL_VERIFY(!!CurrentSchema)("portion", portion.DebugString());
- return CurrentSchema;
- }
- };
-
ISnapshotSchema::TPtr GetSchema(const TVersionedIndex& index) const;
ui32 GetRecordsCount() const {
diff --git a/ydb/core/tx/columnshard/engines/portions/read_with_blobs.cpp b/ydb/core/tx/columnshard/engines/portions/read_with_blobs.cpp
index 6c040921608..bcdcb1ff10d 100644
--- a/ydb/core/tx/columnshard/engines/portions/read_with_blobs.cpp
+++ b/ydb/core/tx/columnshard/engines/portions/read_with_blobs.cpp
@@ -117,7 +117,7 @@ std::optional<TWritePortionInfoWithBlobsResult> TReadPortionInfoWithBlobs::SyncP
}
TPortionAccessorConstructor constructor = TPortionAccessorConstructor::BuildForRewriteBlobs(source.PortionInfo.GetPortionInfo());
- constructor.MutablePortionConstructor().SetMinSnapshotDeprecated(to->GetSnapshot());
+ constructor.MutablePortionConstructor().SetMinSnapshotDeprecated(TSnapshot(0, 0));
constructor.MutablePortionConstructor().SetSchemaVersion(to->GetVersion());
constructor.MutablePortionConstructor().MutableMeta().ResetTierName(targetTier);
diff --git a/ydb/core/tx/columnshard/engines/portions/write_with_blobs.cpp b/ydb/core/tx/columnshard/engines/portions/write_with_blobs.cpp
index 8e0141913d7..1c440c9f62c 100644
--- a/ydb/core/tx/columnshard/engines/portions/write_with_blobs.cpp
+++ b/ydb/core/tx/columnshard/engines/portions/write_with_blobs.cpp
@@ -30,7 +30,7 @@ void TWritePortionInfoWithBlobsResult::TBlobInfo::RegisterBlobId(TWritePortionIn
TWritePortionInfoWithBlobsConstructor TWritePortionInfoWithBlobsConstructor::BuildByBlobs(std::vector<TSplittedBlob>&& chunks,
const THashMap<ui32, std::shared_ptr<IPortionDataChunk>>& inplaceChunks, const TInternalPathId granule, const ui64 schemaVersion,
- const TSnapshot& snapshot, const std::shared_ptr<IStoragesManager>& operators, const EPortionType type) {
+ const TSnapshot& /*snapshot*/, const std::shared_ptr<IStoragesManager>& operators, const EPortionType type) {
TPortionAccessorConstructor constructor = [&]() {
switch (type) {
case EPortionType::Written:
@@ -39,7 +39,7 @@ TWritePortionInfoWithBlobsConstructor TWritePortionInfoWithBlobsConstructor::Bui
return TPortionAccessorConstructor(std::make_unique<TCompactedPortionInfoConstructor>(granule));
}
}();
- constructor.MutablePortionConstructor().SetMinSnapshotDeprecated(snapshot);
+ constructor.MutablePortionConstructor().SetMinSnapshotDeprecated(TSnapshot(0, 0));
constructor.MutablePortionConstructor().SetSchemaVersion(schemaVersion);
return BuildByBlobs(std::move(chunks), inplaceChunks, std::move(constructor), operators);
}
diff --git a/ydb/core/tx/columnshard/engines/storage/granule/granule.cpp b/ydb/core/tx/columnshard/engines/storage/granule/granule.cpp
index 15e2be8e9d8..109b018667e 100644
--- a/ydb/core/tx/columnshard/engines/storage/granule/granule.cpp
+++ b/ydb/core/tx/columnshard/engines/storage/granule/granule.cpp
@@ -245,7 +245,6 @@ bool TGranuleMeta::TestingLoad(IDbWrapper& db, const TVersionedIndex& versionedI
}
{
- TPortionInfo::TSchemaCursor schema(versionedIndex);
if (!db.LoadColumns(PathId, [&](TColumnChunkLoadContextV2&& loadContext) {
auto* constructor = constructors.GetConstructorVerified(loadContext.GetPortionId());
for (auto&& i : loadContext.BuildRecordsV1()) {
diff --git a/ydb/core/tx/columnshard/engines/storage/granule/stages.cpp b/ydb/core/tx/columnshard/engines/storage/granule/stages.cpp
index bd38f91069d..48c7963c26c 100644
--- a/ydb/core/tx/columnshard/engines/storage/granule/stages.cpp
+++ b/ydb/core/tx/columnshard/engines/storage/granule/stages.cpp
@@ -29,7 +29,6 @@ bool TGranuleOnlyPortionsReader::DoPrecharge(NTabletFlatExecutor::TTransactionCo
bool TGranuleColumnsReader::DoExecute(NTabletFlatExecutor::TTransactionContext& txc, const TActorContext& /*ctx*/) {
TDbWrapper db(txc.DB, &*DsGroupSelector);
- TPortionInfo::TSchemaCursor schema(*VersionedIndex);
Context->ClearRecords();
return db.LoadColumns(Self->GetPathId(), [&](TColumnChunkLoadContextV2&& loadContext) {
Context->Add(std::move(loadContext));
diff --git a/ydb/core/tx/columnshard/engines/ut/ut_logs_engine.cpp b/ydb/core/tx/columnshard/engines/ut/ut_logs_engine.cpp
index 3cf2eaf180e..1dfd474bf4d 100644
--- a/ydb/core/tx/columnshard/engines/ut/ut_logs_engine.cpp
+++ b/ydb/core/tx/columnshard/engines/ut/ut_logs_engine.cpp
@@ -138,7 +138,7 @@ public:
Y_ABORT_UNLESS(portion.GetPathId() == it->second.MutablePortionConstructor().GetPathId() &&
portion.GetPortionId() == it->second.MutablePortionConstructor().GetPortionIdVerified());
}
- it->second.MutablePortionConstructor().SetMinSnapshotDeprecated(portion.GetMinSnapshotDeprecated());
+ it->second.MutablePortionConstructor().SetSchemaVersion(portion.GetSchemaVersionVerified());
if (portion.HasRemoveSnapshot()) {
if (!it->second.MutablePortionConstructor().HasRemoveSnapshot()) {
it->second.MutablePortionConstructor().SetRemoveSnapshot(portion.GetRemoveSnapshotVerified());
diff --git a/ydb/core/tx/columnshard/normalizer/abstract/abstract.cpp b/ydb/core/tx/columnshard/normalizer/abstract/abstract.cpp
index 7ca284d9d8d..4bb3ce6a641 100644
--- a/ydb/core/tx/columnshard/normalizer/abstract/abstract.cpp
+++ b/ydb/core/tx/columnshard/normalizer/abstract/abstract.cpp
@@ -100,7 +100,9 @@ void TNormalizationController::InitNormalizers(const TInitContext& ctx) {
if (::ToString(nType).StartsWith("Deprecated")) {
continue;
}
- auto normalizer = RegisterNormalizer(std::shared_ptr<INormalizerComponent>(INormalizerComponent::TFactory::Construct(::ToString(nType), ctx)));
+ auto component = INormalizerComponent::TFactory::MakeHolder(::ToString(nType), ctx);
+ AFL_VERIFY(component)("class_name", ::ToString(nType));
+ auto normalizer = RegisterNormalizer(std::shared_ptr<INormalizerComponent>(component.Release()));
AFL_VERIFY(normalizer->GetEnumSequentialIdVerified() == nType);
AFL_VERIFY(lastRegisteredNormalizer <= nType)("current", ToString(nType))("last", ToString(lastRegisteredNormalizer));
lastRegisteredNormalizer = nType;
diff --git a/ydb/core/tx/columnshard/normalizer/abstract/abstract.h b/ydb/core/tx/columnshard/normalizer/abstract/abstract.h
index 636177db8f7..8af30bb63f5 100644
--- a/ydb/core/tx/columnshard/normalizer/abstract/abstract.h
+++ b/ydb/core/tx/columnshard/normalizer/abstract/abstract.h
@@ -51,6 +51,7 @@ public:
}
};
+// DONT REMOVE AND DONT CHANGE PLACES! PERSISTENT! ADD Deprecated PREFIX FOR REMOVED NORMALIZER
enum class ENormalizerSequentialId : ui32 {
Granules = 1,
Chunks,
@@ -61,10 +62,10 @@ enum class ENormalizerSequentialId : ui32 {
DeprecatedEmptyPortionsCleaner,
CleanInsertionDedup,
GCCountersNormalizer,
- RestorePortionFromChunks,
+ DeprecatedRestorePortionFromChunks,
SyncPortionFromChunks,
DeprecatedRestoreV1Chunks,
- SyncMinSnapshotFromChunks,
+ DeprecatedSyncMinSnapshotFromChunks,
DeprecatedRestoreV1Chunks_V1,
RestoreV1Chunks_V2,
RestoreV2Chunks,
diff --git a/ydb/core/tx/columnshard/normalizer/portion/normalizer.cpp b/ydb/core/tx/columnshard/normalizer/portion/normalizer.cpp
index 0408e14b7f6..0005d193f67 100644
--- a/ydb/core/tx/columnshard/normalizer/portion/normalizer.cpp
+++ b/ydb/core/tx/columnshard/normalizer/portion/normalizer.cpp
@@ -57,9 +57,9 @@ TConclusion<std::vector<INormalizerTask::TPtr>> TPortionsNormalizerBase::DoInit(
return conclusion;
}
}
- TPortionInfo::TSchemaCursor schema(tablesManager.GetPrimaryIndexSafe().GetVersionedIndex());
for (auto&& [_, p] : portions) {
- (*schemas)[p.GetPortionConstructor().GetPortionIdVerified()] = schema.GetSchema(p.GetPortionConstructor());
+ (*schemas)[p.GetPortionConstructor().GetPortionIdVerified()] =
+ tablesManager.GetPrimaryIndexSafe().GetVersionedIndex().GetSchemaVerified(p.GetPortionConstructor().GetSchemaVersionVerified());
}
std::vector<TPortionDataAccessor> package;
@@ -116,7 +116,6 @@ TConclusionStatus TPortionsNormalizerBase::InitColumns(
return TConclusionStatus::Fail("Not ready");
}
- TPortionInfo::TSchemaCursor schema(tablesManager.GetPrimaryIndexSafe().GetVersionedIndex());
auto initPortion = [&](TColumnChunkLoadContextV1&& loadContext) {
if (!columnsFilter.empty() && !columnsFilter.contains(loadContext.GetAddress().GetColumnId())) {
return;
diff --git a/ydb/core/tx/columnshard/normalizer/portion/restore_portion_from_chunks.cpp b/ydb/core/tx/columnshard/normalizer/portion/restore_portion_from_chunks.cpp
deleted file mode 100644
index 480e27ef055..00000000000
--- a/ydb/core/tx/columnshard/normalizer/portion/restore_portion_from_chunks.cpp
+++ /dev/null
@@ -1,148 +0,0 @@
-#include "normalizer.h"
-#include "restore_portion_from_chunks.h"
-
-#include <ydb/core/formats/arrow/size_calcer.h>
-#include <ydb/core/tx/columnshard/engines/portions/data_accessor.h>
-#include <ydb/core/tx/columnshard/engines/portions/portion_info.h>
-#include <ydb/core/tx/columnshard/tables_manager.h>
-
-namespace NKikimr::NOlap::NRestorePortionsFromChunks {
-
-class TPatchItem {
-private:
- YDB_READONLY(ui32, SchemaVersion, 0);
- TColumnChunkLoadContext ChunkInfo;
-
-public:
- const TColumnChunkLoadContext& GetChunkInfo() const {
- return ChunkInfo;
- }
-
- TPatchItem(const ui32 schemaVersion, TColumnChunkLoadContext&& chunkInfo)
- : SchemaVersion(schemaVersion)
- , ChunkInfo(std::move(chunkInfo)) {
- }
-};
-
-class TChanges: public INormalizerChanges {
-private:
- std::vector<TPatchItem> Patches;
-
-public:
- TChanges(std::vector<TPatchItem>&& patches)
- : Patches(std::move(patches)) {
- }
- virtual bool ApplyOnExecute(NTabletFlatExecutor::TTransactionContext& txc, const TNormalizationController&) const override {
- using namespace NColumnShard;
- NIceDb::TNiceDb db(txc.DB);
- for (auto&& i : Patches) {
- AFL_VERIFY(i.GetChunkInfo().GetMetaProto().HasPortionMeta());
- auto metaProtoString = i.GetChunkInfo().GetMetaProto().GetPortionMeta().SerializeAsString();
- using IndexPortions = NColumnShard::Schema::IndexPortions;
- const auto removeSnapshot = i.GetChunkInfo().GetRemoveSnapshot();
- const auto minSnapshotDeprecated = i.GetChunkInfo().GetMinSnapshotDeprecated();
- db.Table<IndexPortions>()
- .Key(i.GetChunkInfo().GetPathId().GetRawValue(), i.GetChunkInfo().GetPortionId())
- .Update(NIceDb::TUpdate<IndexPortions::SchemaVersion>(i.GetSchemaVersion()), NIceDb::TUpdate<IndexPortions::ShardingVersion>(0),
- NIceDb::TUpdate<IndexPortions::CommitPlanStep>(0), NIceDb::TUpdate<IndexPortions::CommitTxId>(0),
- NIceDb::TUpdate<IndexPortions::InsertWriteId>(0), NIceDb::TUpdate<IndexPortions::XPlanStep>(removeSnapshot.GetPlanStep()),
- NIceDb::TUpdate<IndexPortions::XTxId>(removeSnapshot.GetTxId()),
- NIceDb::TUpdate<IndexPortions::MinSnapshotPlanStep>(minSnapshotDeprecated.GetPlanStep()),
- NIceDb::TUpdate<IndexPortions::MinSnapshotTxId>(minSnapshotDeprecated.GetTxId()),
- NIceDb::TUpdate<IndexPortions::Metadata>(metaProtoString));
- }
-
- return true;
- }
-
- virtual ui64 GetSize() const override {
- return Patches.size();
- }
-};
-
-TConclusion<std::vector<INormalizerTask::TPtr>> TNormalizer::DoInit(
- const TNormalizationController& controller, NTabletFlatExecutor::TTransactionContext& txc) {
- using namespace NColumnShard;
- NIceDb::TNiceDb db(txc.DB);
-
- bool ready = true;
- ready = ready & Schema::Precharge<Schema::IndexColumns>(db, txc.DB.GetScheme());
- ready = ready & Schema::Precharge<Schema::IndexPortions>(db, txc.DB.GetScheme());
- if (!ready) {
- return TConclusionStatus::Fail("Not ready");
- }
-
- TTablesManager tablesManager(controller.GetStoragesManager(), std::make_shared<NDataAccessorControl::TLocalManager>(nullptr),
- std::make_shared<TSchemaObjectsCache>(), std::make_shared<TPortionIndexStats>(), 0);
- if (!tablesManager.InitFromDB(db)) {
- ACFL_TRACE("normalizer", "TChunksNormalizer")("error", "can't initialize tables manager");
- return TConclusionStatus::Fail("Can't load index");
- }
-
- THashMap<ui64, TPortionLoadContext> dbPortions;
-
- {
- auto rowset = db.Table<Schema::IndexPortions>().Select();
- if (!rowset.IsReady()) {
- return TConclusionStatus::Fail("Not ready");
- }
-
- while (!rowset.EndOfSet()) {
- TPortionLoadContext portion(rowset);
- AFL_VERIFY(dbPortions.emplace(portion.GetPortionId(), portion).second);
-
- if (!rowset.Next()) {
- return TConclusionStatus::Fail("Not ready");
- }
- }
- }
-
- THashMap<ui64, TColumnChunkLoadContext> portionsToWrite;
- {
- auto rowset = db.Table<Schema::IndexColumns>().Select();
- if (!rowset.IsReady()) {
- return TConclusionStatus::Fail("Not ready");
- }
-
- THashSet<ui64> portionsToRestore;
- while (!rowset.EndOfSet()) {
- TColumnChunkLoadContext chunk(rowset, &DsGroupSelector);
- if (!dbPortions.contains(chunk.GetPortionId())) {
- portionsToRestore.emplace(chunk.GetPortionId());
- if (chunk.GetMetaProto().HasPortionMeta()) {
- AFL_VERIFY(portionsToWrite.emplace(chunk.GetPortionId(), chunk).second);
- }
- }
-
- if (!rowset.Next()) {
- return TConclusionStatus::Fail("Not ready");
- }
- }
- AFL_VERIFY(portionsToRestore.size() == portionsToWrite.size());
- }
-
- std::vector<INormalizerTask::TPtr> tasks;
- if (portionsToWrite.empty()) {
- return tasks;
- }
-
- std::vector<TPatchItem> package;
-
- for (auto&& [_, chunkWithPortionData] : portionsToWrite) {
- package.emplace_back(
- tablesManager.GetPrimaryIndexSafe().GetVersionedIndex().GetSchemaVerified(chunkWithPortionData.GetMinSnapshotDeprecated())->GetVersion(),
- std::move(chunkWithPortionData));
- if (package.size() == 100) {
- std::vector<TPatchItem> local;
- local.swap(package);
- tasks.emplace_back(std::make_shared<TTrivialNormalizerTask>(std::make_shared<TChanges>(std::move(local))));
- }
- }
-
- if (package.size() > 0) {
- tasks.emplace_back(std::make_shared<TTrivialNormalizerTask>(std::make_shared<TChanges>(std::move(package))));
- }
- return tasks;
-}
-
-} // namespace NKikimr::NOlap::NRestorePortionsFromChunks
diff --git a/ydb/core/tx/columnshard/normalizer/portion/restore_portion_from_chunks.h b/ydb/core/tx/columnshard/normalizer/portion/restore_portion_from_chunks.h
deleted file mode 100644
index 110b0e35a7c..00000000000
--- a/ydb/core/tx/columnshard/normalizer/portion/restore_portion_from_chunks.h
+++ /dev/null
@@ -1,47 +0,0 @@
-#pragma once
-
-#include <ydb/core/tx/columnshard/columnshard_schema.h>
-#include <ydb/core/tx/columnshard/defs.h>
-#include <ydb/core/tx/columnshard/normalizer/abstract/abstract.h>
-
-namespace NKikimr::NColumnShard {
-class TTablesManager;
-}
-
-namespace NKikimr::NOlap::NRestorePortionsFromChunks {
-
-class TNormalizer: public TNormalizationController::INormalizerComponent {
-private:
- using TBase = TNormalizationController::INormalizerComponent;
-
-public:
- static TString GetClassNameStatic() {
- return ::ToString(ENormalizerSequentialId::RestorePortionFromChunks);
- }
-
- virtual std::optional<ENormalizerSequentialId> DoGetEnumSequentialId() const override {
- return ENormalizerSequentialId::RestorePortionFromChunks;
- }
-
- virtual TString GetClassName() const override {
- return GetClassNameStatic();
- }
-
- class TNormalizerResult;
-
- static inline INormalizerComponent::TFactory::TRegistrator<TNormalizer> Registrator =
- INormalizerComponent::TFactory::TRegistrator<TNormalizer>(GetClassNameStatic());
-
-public:
- TNormalizer(const TNormalizationController::TInitContext& info)
- : TBase(info)
- , DsGroupSelector(info.GetStorageInfo()) {
- }
-
- virtual TConclusion<std::vector<INormalizerTask::TPtr>> DoInit(
- const TNormalizationController& controller, NTabletFlatExecutor::TTransactionContext& txc) override;
-
-private:
- NColumnShard::TBlobGroupSelector DsGroupSelector;
-};
-} // namespace NKikimr::NOlap::NRestorePortionsFromChunks
diff --git a/ydb/core/tx/columnshard/normalizer/portion/snapshot_from_chunks.cpp b/ydb/core/tx/columnshard/normalizer/portion/snapshot_from_chunks.cpp
deleted file mode 100644
index 1423109e9d2..00000000000
--- a/ydb/core/tx/columnshard/normalizer/portion/snapshot_from_chunks.cpp
+++ /dev/null
@@ -1,138 +0,0 @@
-#include <ydb/core/tx/columnshard/common/path_id.h>
-#include "snapshot_from_chunks.h"
-#include "normalizer.h"
-
-#include <ydb/core/formats/arrow/size_calcer.h>
-#include <ydb/core/tx/columnshard/engines/portions/data_accessor.h>
-#include <ydb/core/tx/columnshard/engines/portions/portion_info.h>
-#include <ydb/core/tx/columnshard/tables_manager.h>
-
-namespace NKikimr::NOlap::NSyncMinSnapshotFromChunks {
-
-class TPatchItem {
-private:
- TPortionLoadContext PortionInfo;
- YDB_READONLY(NOlap::TSnapshot, Snapshot, NOlap::TSnapshot::Zero());
-
-public:
- const TPortionLoadContext& GetPortionInfo() const {
- return PortionInfo;
- }
-
- TPatchItem(TPortionLoadContext&& portion, const NOlap::TSnapshot& snapshot)
- : PortionInfo(std::move(portion))
- , Snapshot(snapshot) {
- }
-};
-
-class TChanges: public INormalizerChanges {
-private:
- std::vector<TPatchItem> Patches;
-
-public:
- TChanges(std::vector<TPatchItem>&& patches)
- : Patches(std::move(patches)) {
- }
- virtual bool ApplyOnExecute(NTabletFlatExecutor::TTransactionContext& txc, const TNormalizationController&) const override {
- using namespace NColumnShard;
- NIceDb::TNiceDb db(txc.DB);
- for (auto&& i : Patches) {
- db.Table<Schema::IndexPortions>()
- .Key(i.GetPortionInfo().GetPathId().GetRawValue(), i.GetPortionInfo().GetPortionId())
- .Update(NIceDb::TUpdate<Schema::IndexPortions::MinSnapshotPlanStep>(i.GetSnapshot().GetPlanStep()),
- NIceDb::TUpdate<Schema::IndexPortions::MinSnapshotTxId>(i.GetSnapshot().GetTxId())
- );
- }
-
- return true;
- }
-
- virtual ui64 GetSize() const override {
- return Patches.size();
- }
-
-};
-
-TConclusion<std::vector<INormalizerTask::TPtr>> TNormalizer::DoInit(
- const TNormalizationController& /*controller*/, NTabletFlatExecutor::TTransactionContext& txc) {
- using namespace NColumnShard;
- NIceDb::TNiceDb db(txc.DB);
-
- bool ready = true;
- ready = ready & Schema::Precharge<Schema::IndexPortions>(db, txc.DB.GetScheme());
- ready = ready & Schema::Precharge<Schema::IndexColumns>(db, txc.DB.GetScheme());
- if (!ready) {
- return TConclusionStatus::Fail("Not ready");
- }
-
- THashMap<ui64, TPortionLoadContext> dbPortions;
- THashMap<ui64, NOlap::TSnapshot> initSnapshot;
-
- {
- auto rowset = db.Table<Schema::IndexPortions>().Select();
- if (!rowset.IsReady()) {
- return TConclusionStatus::Fail("Not ready");
- }
-
- while (!rowset.EndOfSet()) {
- TPortionLoadContext portion(rowset);
- if (!portion.GetDeprecatedMinSnapshot()) {
- AFL_VERIFY(dbPortions.emplace(portion.GetPortionId(), portion).second);
- }
-
- if (!rowset.Next()) {
- return TConclusionStatus::Fail("Not ready");
- }
- }
- }
-
- {
- auto rowset = db.Table<Schema::IndexColumns>().Select();
- if (!rowset.IsReady()) {
- return TConclusionStatus::Fail("Not ready");
- }
-
- while (!rowset.EndOfSet()) {
- TColumnChunkLoadContext chunk(rowset, &DsGroupSelector);
- const ui64 portionId = chunk.GetPortionId();
- if (dbPortions.contains(portionId)) {
- auto it = initSnapshot.find(portionId);
- if (it == initSnapshot.end()) {
- initSnapshot.emplace(portionId, chunk.GetMinSnapshotDeprecated());
- } else {
- AFL_VERIFY(it->second == chunk.GetMinSnapshotDeprecated());
- }
- }
-
- if (!rowset.Next()) {
- return TConclusionStatus::Fail("Not ready");
- }
- }
- }
- AFL_VERIFY(dbPortions.size() == initSnapshot.size())("portions", dbPortions.size())("records", initSnapshot.size());
-
- std::vector<INormalizerTask::TPtr> tasks;
- if (dbPortions.empty()) {
- return tasks;
- }
-
- std::vector<TPatchItem> package;
-
- for (auto&& [portionId, portion] : dbPortions) {
- auto it = initSnapshot.find(portionId);
- AFL_VERIFY(it != initSnapshot.end());
- package.emplace_back(std::move(portion), it->second);
- if (package.size() == 100) {
- std::vector<TPatchItem> local;
- local.swap(package);
- tasks.emplace_back(std::make_shared<TTrivialNormalizerTask>(std::make_shared<TChanges>(std::move(local))));
- }
- }
-
- if (package.size() > 0) {
- tasks.emplace_back(std::make_shared<TTrivialNormalizerTask>(std::make_shared<TChanges>(std::move(package))));
- }
- return tasks;
-}
-
-} // namespace NKikimr::NOlap::NChunksActualization
diff --git a/ydb/core/tx/columnshard/normalizer/portion/snapshot_from_chunks.h b/ydb/core/tx/columnshard/normalizer/portion/snapshot_from_chunks.h
deleted file mode 100644
index 5272bd1dadc..00000000000
--- a/ydb/core/tx/columnshard/normalizer/portion/snapshot_from_chunks.h
+++ /dev/null
@@ -1,47 +0,0 @@
-#pragma once
-
-#include <ydb/core/tx/columnshard/columnshard_schema.h>
-#include <ydb/core/tx/columnshard/defs.h>
-#include <ydb/core/tx/columnshard/normalizer/abstract/abstract.h>
-
-namespace NKikimr::NColumnShard {
-class TTablesManager;
-}
-
-namespace NKikimr::NOlap::NSyncMinSnapshotFromChunks {
-
-class TNormalizer: public TNormalizationController::INormalizerComponent {
-private:
- using TBase = TNormalizationController::INormalizerComponent;
-
-public:
- static TString GetClassNameStatic() {
- return ::ToString(ENormalizerSequentialId::SyncMinSnapshotFromChunks);
- }
-
- virtual std::optional<ENormalizerSequentialId> DoGetEnumSequentialId() const override {
- return ENormalizerSequentialId::SyncMinSnapshotFromChunks;
- }
-
- virtual TString GetClassName() const override {
- return GetClassNameStatic();
- }
-
- class TNormalizerResult;
-
- static inline INormalizerComponent::TFactory::TRegistrator<TNormalizer> Registrator =
- INormalizerComponent::TFactory::TRegistrator<TNormalizer>(GetClassNameStatic());
-
-public:
- TNormalizer(const TNormalizationController::TInitContext& info)
- : TBase(info)
- , DsGroupSelector(info.GetStorageInfo()) {
- }
-
- virtual TConclusion<std::vector<INormalizerTask::TPtr>> DoInit(
- const TNormalizationController& controller, NTabletFlatExecutor::TTransactionContext& txc) override;
-
-private:
- NColumnShard::TBlobGroupSelector DsGroupSelector;
-};
-} // namespace NKikimr::NOlap::NSyncMinSnapshotFromChunks
diff --git a/ydb/core/tx/columnshard/normalizer/portion/ya.make b/ydb/core/tx/columnshard/normalizer/portion/ya.make
index e7eaf752bad..28b8ad4da38 100644
--- a/ydb/core/tx/columnshard/normalizer/portion/ya.make
+++ b/ydb/core/tx/columnshard/normalizer/portion/ya.make
@@ -9,10 +9,8 @@ SRCS(
GLOBAL broken_blobs.cpp
GLOBAL special_cleaner.cpp
GLOBAL chunks_actualization.cpp
- GLOBAL restore_portion_from_chunks.cpp
GLOBAL restore_v1_chunks.cpp
GLOBAL restore_v2_chunks.cpp
- GLOBAL snapshot_from_chunks.cpp
GLOBAL leaked_blobs.cpp
)