diff options
author | nsofya <nsofya@ydb.tech> | 2023-12-13 17:58:37 +0300 |
---|---|---|
committer | nsofya <nsofya@ydb.tech> | 2023-12-13 20:38:25 +0300 |
commit | e2ae1d98f104be3d94d131d04b20886c5c06089f (patch) | |
tree | 8f1dc113c34192cdca8894ccbeba0e679aa78224 | |
parent | 1d09d7aba5ab2b3a137b6bb0efaac974f6ca99b5 (diff) | |
download | ydb-e2ae1d98f104be3d94d131d04b20886c5c06089f.tar.gz |
KIKIMR-20482: Remove duplicates on schema versions
12 files changed, 52 insertions, 45 deletions
diff --git a/ydb/core/tx/columnshard/blobs_action/transaction/tx_write.cpp b/ydb/core/tx/columnshard/blobs_action/transaction/tx_write.cpp index 5c057f3afd..dac5556582 100644 --- a/ydb/core/tx/columnshard/blobs_action/transaction/tx_write.cpp +++ b/ydb/core/tx/columnshard/blobs_action/transaction/tx_write.cpp @@ -13,7 +13,7 @@ bool TTxWrite::InsertOneBlob(TTransactionContext& txc, const TEvPrivate::TEvWrit const auto& writeMeta(PutBlobResult->Get()->GetWriteMeta()); - auto tableSchema = Self->TablesManager.GetPrimaryIndex()->GetVersionedIndex().GetSchemaUnsafe(PutBlobResult->Get()->GetSchemaVersion()); + auto tableSchema = Self->TablesManager.GetPrimaryIndex()->GetVersionedIndex().GetSchemaVerified(PutBlobResult->Get()->GetSchemaVersion()); NOlap::TInsertedData insertData((ui64)writeId, writeMeta.GetTableId(), writeMeta.GetDedupId(), blobRange, meta, tableSchema->GetVersion(), blob); bool ok = Self->InsertTable->Insert(dbTable, std::move(insertData)); diff --git a/ydb/core/tx/columnshard/columnshard_impl.cpp b/ydb/core/tx/columnshard/columnshard_impl.cpp index c79a3ee408..6c143dda48 100644 --- a/ydb/core/tx/columnshard/columnshard_impl.cpp +++ b/ydb/core/tx/columnshard/columnshard_impl.cpp @@ -519,6 +519,8 @@ void TColumnShard::RunEnsureTable(const NKikimrTxColumnShard::TCreateTable& tabl TTableInfo::TTableVersionInfo tableVerProto; tableVerProto.SetPathId(pathId); + // check schema changed + if (tableProto.HasSchemaPreset()) { Y_ABORT_UNLESS(!tableProto.HasSchema(), "Tables has either schema or preset"); @@ -528,7 +530,7 @@ void TColumnShard::RunEnsureTable(const NKikimrTxColumnShard::TCreateTable& tabl tableVerProto.SetSchemaPresetId(preset.GetId()); if (TablesManager.RegisterSchemaPreset(preset, db)) { - TablesManager.AddPresetVersion(tableProto.GetSchemaPreset().GetId(), version, tableProto.GetSchemaPreset().GetSchema(), db); + TablesManager.AddSchemaVersion(tableProto.GetSchemaPreset().GetId(), version, tableProto.GetSchemaPreset().GetSchema(), db); } } else { Y_ABORT_UNLESS(tableProto.HasSchema(), "Tables has either schema or preset"); @@ -571,7 +573,7 @@ void TColumnShard::RunAlterTable(const NKikimrTxColumnShard::TAlterTable& alterP TTableInfo::TTableVersionInfo tableVerProto; if (alterProto.HasSchemaPreset()) { tableVerProto.SetSchemaPresetId(alterProto.GetSchemaPreset().GetId()); - TablesManager.AddPresetVersion(alterProto.GetSchemaPreset().GetId(), version, alterProto.GetSchemaPreset().GetSchema(), db); + TablesManager.AddSchemaVersion(alterProto.GetSchemaPreset().GetId(), version, alterProto.GetSchemaPreset().GetSchema(), db); } else if (alterProto.HasSchema()) { *tableVerProto.MutableSchema() = alterProto.GetSchema(); } @@ -630,7 +632,7 @@ void TColumnShard::RunAlterStore(const NKikimrTxColumnShard::TAlterStore& proto, if (!TablesManager.HasPreset(presetProto.GetId())) { continue; // we don't update presets that we don't use } - TablesManager.AddPresetVersion(presetProto.GetId(), version, presetProto.GetSchema(), db); + TablesManager.AddSchemaVersion(presetProto.GetId(), version, presetProto.GetSchema(), db); } } diff --git a/ydb/core/tx/columnshard/engines/changes/indexation.cpp b/ydb/core/tx/columnshard/engines/changes/indexation.cpp index 8c23b15af2..9247e37a64 100644 --- a/ydb/core/tx/columnshard/engines/changes/indexation.cpp +++ b/ydb/core/tx/columnshard/engines/changes/indexation.cpp @@ -70,7 +70,7 @@ TConclusionStatus TInsertColumnEngineChanges::DoConstructBlobs(TConstructionCont for (auto& inserted : DataToIndex) { const TBlobRange& blobRange = inserted.GetBlobRange(); - auto blobSchema = context.SchemaVersions.GetSchema(inserted.GetSchemaVersion()); + auto blobSchema = context.SchemaVersions.GetSchemaVerified(inserted.GetSchemaVersion()); auto& indexInfo = blobSchema->GetIndexInfo(); Y_ABORT_UNLESS(indexInfo.IsSorted()); @@ -88,7 +88,7 @@ TConclusionStatus TInsertColumnEngineChanges::DoConstructBlobs(TConstructionCont ; } - batch = AddSpecials(batch, blobSchema->GetIndexInfo(), inserted); + batch = AddSpecials(batch, indexInfo, inserted); batch = resultSchema->NormalizeBatch(*blobSchema, batch); pathBatches[inserted.PathId].push_back(batch); Y_DEBUG_ABORT_UNLESS(NArrow::IsSorted(pathBatches[inserted.PathId].back(), resultSchema->GetIndexInfo().GetReplaceKey())); diff --git a/ydb/core/tx/columnshard/engines/column_engine.h b/ydb/core/tx/columnshard/engines/column_engine.h index d074e1510d..7697e28fb0 100644 --- a/ydb/core/tx/columnshard/engines/column_engine.h +++ b/ydb/core/tx/columnshard/engines/column_engine.h @@ -305,9 +305,9 @@ public: return it == SnapshotByVersion.end() ? nullptr : it->second; } - ISnapshotSchema::TPtr GetSchemaUnsafe(const ui64 version) const { + ISnapshotSchema::TPtr GetSchemaVerified(const ui64 version) const { auto it = SnapshotByVersion.find(version); - Y_ABORT_UNLESS(it != SnapshotByVersion.end()); + Y_ABORT_UNLESS(it != SnapshotByVersion.end(), "no schema for version %lu", version); return it->second; } @@ -331,23 +331,21 @@ public: return IndexKey; } - void AddIndex(const TSnapshot& version, TIndexInfo&& indexInfo) { + void AddIndex(const TSnapshot& snapshot, TIndexInfo&& indexInfo) { if (Snapshots.empty()) { IndexKey = indexInfo.GetIndexKey(); } else { Y_ABORT_UNLESS(IndexKey->Equals(indexInfo.GetIndexKey())); } - auto it = Snapshots.emplace(version, std::make_shared<TSnapshotSchema>(std::move(indexInfo), version)); - Y_ABORT_UNLESS(it.second); - auto newVersion = it.first->second->GetVersion(); - if (SnapshotByVersion.contains(newVersion)) { - Y_VERIFY_S(LastSchemaVersion != 0, TStringBuilder() << "Last: " << LastSchemaVersion); - Y_VERIFY_S(LastSchemaVersion == newVersion, TStringBuilder() << "Last: " << LastSchemaVersion << ";New: " << newVersion); + auto newVersion = indexInfo.GetVersion(); + auto itVersion = SnapshotByVersion.emplace(newVersion, std::make_shared<TSnapshotSchema>(std::move(indexInfo), snapshot)); + if (!itVersion.second) { + AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("message", "Skip registered version")("version", LastSchemaVersion); } - - SnapshotByVersion[newVersion] = it.first->second; - LastSchemaVersion = newVersion; + auto itSnap = Snapshots.emplace(snapshot, itVersion.first->second); + Y_ABORT_UNLESS(itSnap.second); + LastSchemaVersion = std::max(newVersion, LastSchemaVersion); } }; @@ -378,8 +376,7 @@ public: virtual std::shared_ptr<TTTLColumnEngineChanges> StartTtl(const THashMap<ui64, TTiering>& pathEviction, const THashSet<TPortionAddress>& busyPortions, ui64 maxBytesToEvict = TCompactionLimits::DEFAULT_EVICTION_BYTES) noexcept = 0; virtual bool ApplyChanges(IDbWrapper& db, std::shared_ptr<TColumnEngineChanges> changes, const TSnapshot& snapshot) noexcept = 0; - virtual void UpdateDefaultSchema(const TSnapshot& snapshot, TIndexInfo&& info) = 0; - //virtual void UpdateTableSchema(ui64 pathId, const TSnapshot& snapshot, TIndexInfo&& info) = 0; // TODO + virtual void RegisterSchemaVersion(const TSnapshot& snapshot, TIndexInfo&& info) = 0; virtual const TMap<ui64, std::shared_ptr<TColumnEngineStats>>& GetStats() const = 0; virtual const TColumnEngineStats& GetTotalStats() = 0; virtual ui64 MemoryUsage() const { return 0; } diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp index 7480b083d9..18290e79d9 100644 --- a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp +++ b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp @@ -129,13 +129,16 @@ void TColumnEngineForLogs::UpdatePortionStats(TColumnEngineStats& engineStats, c } } -void TColumnEngineForLogs::UpdateDefaultSchema(const TSnapshot& snapshot, TIndexInfo&& info) { +void TColumnEngineForLogs::RegisterSchemaVersion(const TSnapshot& snapshot, TIndexInfo&& indexInfo) { if (!ColumnsTable) { - ui32 indexId = info.GetId(); + ui32 indexId = indexInfo.GetId(); ColumnsTable = std::make_shared<TColumnsTable>(indexId); CountersTable = std::make_shared<TCountersTable>(indexId); + } else { + const NOlap::TIndexInfo& lastIndexInfo = VersionedIndex.GetLastSchema()->GetIndexInfo(); + Y_ABORT_UNLESS(lastIndexInfo.CheckCompatible(indexInfo)); } - VersionedIndex.AddIndex(snapshot, std::move(info)); + VersionedIndex.AddIndex(snapshot, std::move(indexInfo)); } bool TColumnEngineForLogs::Load(IDbWrapper& db) { @@ -143,6 +146,7 @@ bool TColumnEngineForLogs::Load(IDbWrapper& db) { Loaded = true; THashMap<ui64, ui64> granuleToPathIdDecoder; { + TMemoryProfileGuard g("TTxInit/LoadColumns"); auto guard = GranulesStorage->StartPackModification(); if (!LoadColumns(db)) { return false; diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.h b/ydb/core/tx/columnshard/engines/column_engine_logs.h index b0098c6d2c..00439c1980 100644 --- a/ydb/core/tx/columnshard/engines/column_engine_logs.h +++ b/ydb/core/tx/columnshard/engines/column_engine_logs.h @@ -141,9 +141,7 @@ public: bool ApplyChanges(IDbWrapper& db, std::shared_ptr<TColumnEngineChanges> indexChanges, const TSnapshot& snapshot) noexcept override; - void UpdateDefaultSchema(const TSnapshot& snapshot, TIndexInfo&& info) override; - - + void RegisterSchemaVersion(const TSnapshot& snapshot, TIndexInfo&& info) override; std::shared_ptr<TSelectInfo> Select(ui64 pathId, TSnapshot snapshot, const TPKRangesFilter& pkRangesFilter) const override; diff --git a/ydb/core/tx/columnshard/engines/scheme/abstract_scheme.cpp b/ydb/core/tx/columnshard/engines/scheme/abstract_scheme.cpp index 6f72ad5968..7bcfff8a55 100644 --- a/ydb/core/tx/columnshard/engines/scheme/abstract_scheme.cpp +++ b/ydb/core/tx/columnshard/engines/scheme/abstract_scheme.cpp @@ -29,8 +29,8 @@ std::shared_ptr<arrow::RecordBatch> ISnapshotSchema::NormalizeBatch(const ISnaps if (dataSchema.GetSnapshot() == GetSnapshot()) { return batch; } - const std::shared_ptr<arrow::Schema>& resultArrowSchema = GetSchema(); Y_ABORT_UNLESS(dataSchema.GetSnapshot() < GetSnapshot()); + const std::shared_ptr<arrow::Schema>& resultArrowSchema = GetSchema(); std::vector<std::shared_ptr<arrow::Array>> newColumns; newColumns.reserve(resultArrowSchema->num_fields()); diff --git a/ydb/core/tx/columnshard/engines/scheme/index_info.cpp b/ydb/core/tx/columnshard/engines/scheme/index_info.cpp index 0e492e596d..b438621663 100644 --- a/ydb/core/tx/columnshard/engines/scheme/index_info.cpp +++ b/ydb/core/tx/columnshard/engines/scheme/index_info.cpp @@ -28,6 +28,16 @@ TIndexInfo::TIndexInfo(const TString& name, ui32 id) , Name(name) {} +bool TIndexInfo::CheckCompatible(const TIndexInfo& other) const { + if (!other.GetReplaceKey()->Equals(GetReplaceKey())) { + return false; + } + if (!other.GetIndexKey()->Equals(GetIndexKey())) { + return false; + } + return true; +} + std::shared_ptr<arrow::RecordBatch> TIndexInfo::AddSpecialColumns(const std::shared_ptr<arrow::RecordBatch>& batch, const TSnapshot& snapshot) { Y_ABORT_UNLESS(batch); i64 numColumns = batch->num_columns(); diff --git a/ydb/core/tx/columnshard/engines/scheme/index_info.h b/ydb/core/tx/columnshard/engines/scheme/index_info.h index b365f7227e..4d27e29ebe 100644 --- a/ydb/core/tx/columnshard/engines/scheme/index_info.h +++ b/ydb/core/tx/columnshard/engines/scheme/index_info.h @@ -91,7 +91,6 @@ public: } public: - static TIndexInfo BuildDefault() { TIndexInfo result("dummy", 0); return result; @@ -209,6 +208,8 @@ public: return Version; } + bool CheckCompatible(const TIndexInfo& other) const; + private: ui32 Id; ui64 Version = 0; diff --git a/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp b/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp index 1e3d7f712f..8e0c8ceef1 100644 --- a/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp +++ b/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp @@ -400,7 +400,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) { // load TColumnEngineForLogs engine(0, TestLimits(), CommonStoragesManager); TSnapshot indexSnaphot(1, 1); - engine.UpdateDefaultSchema(indexSnaphot, TIndexInfo(tableInfo)); + engine.RegisterSchemaVersion(indexSnaphot, TIndexInfo(tableInfo)); for (auto&& i : paths) { engine.RegisterTable(i); } @@ -485,7 +485,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) { TSnapshot indexSnapshot(1, 1); TColumnEngineForLogs engine(0, TestLimits(), CommonStoragesManager); - engine.UpdateDefaultSchema(indexSnapshot, TIndexInfo(tableInfo)); + engine.RegisterSchemaVersion(indexSnapshot, TIndexInfo(tableInfo)); engine.RegisterTable(pathId); engine.Load(db); @@ -585,7 +585,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) { TColumnEngineForLogs engine(0, TestLimits(), CommonStoragesManager); TSnapshot indexSnapshot(1, 1); - engine.UpdateDefaultSchema(indexSnapshot, TIndexInfo(tableInfo)); + engine.RegisterSchemaVersion(indexSnapshot, TIndexInfo(tableInfo)); engine.RegisterTable(pathId); engine.Load(db); @@ -612,7 +612,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) { { // check it's overloaded after reload TColumnEngineForLogs tmpEngine(0, TestLimits(), CommonStoragesManager); - tmpEngine.UpdateDefaultSchema(TSnapshot::Zero(), TIndexInfo(tableInfo)); + tmpEngine.RegisterSchemaVersion(TSnapshot::Zero(), TIndexInfo(tableInfo)); tmpEngine.RegisterTable(pathId); tmpEngine.Load(db); } @@ -643,7 +643,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) { { // check it's not overloaded after reload TColumnEngineForLogs tmpEngine(0, TestLimits(), CommonStoragesManager); - tmpEngine.UpdateDefaultSchema(TSnapshot::Zero(), TIndexInfo(tableInfo)); + tmpEngine.RegisterSchemaVersion(TSnapshot::Zero(), TIndexInfo(tableInfo)); tmpEngine.RegisterTable(pathId); tmpEngine.Load(db); } @@ -661,7 +661,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) { TSnapshot indexSnapshot(1, 1); { TColumnEngineForLogs engine(0, TestLimits(), CommonStoragesManager); - engine.UpdateDefaultSchema(indexSnapshot, TIndexInfo(tableInfo)); + engine.RegisterSchemaVersion(indexSnapshot, TIndexInfo(tableInfo)); engine.RegisterTable(pathId); engine.Load(db); @@ -728,7 +728,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) { { // load TColumnEngineForLogs engine(0, TestLimits(), CommonStoragesManager); - engine.UpdateDefaultSchema(indexSnapshot, TIndexInfo(tableInfo)); + engine.RegisterSchemaVersion(indexSnapshot, TIndexInfo(tableInfo)); engine.RegisterTable(pathId); engine.Load(db); diff --git a/ydb/core/tx/columnshard/tables_manager.cpp b/ydb/core/tx/columnshard/tables_manager.cpp index ee9222a40c..cab9f5758c 100644 --- a/ydb/core/tx/columnshard/tables_manager.cpp +++ b/ydb/core/tx/columnshard/tables_manager.cpp @@ -219,7 +219,7 @@ bool TTablesManager::RegisterSchemaPreset(const TSchemaPreset& schemaPreset, NIc return true; } -void TTablesManager::AddPresetVersion(const ui32 presetId, const TRowVersion& version, const NKikimrSchemeOp::TColumnTableSchema& schema, NIceDb::TNiceDb& db) { +void TTablesManager::AddSchemaVersion(const ui32 presetId, const TRowVersion& version, const NKikimrSchemeOp::TColumnTableSchema& schema, NIceDb::TNiceDb& db) { Y_ABORT_UNLESS(SchemaPresets.contains(presetId)); auto preset = SchemaPresets.at(presetId); @@ -250,10 +250,10 @@ void TTablesManager::AddTableVersion(const ui64 pathId, const TRowVersion& versi if (SchemaPresets.empty()) { TSchemaPreset fakePreset; Y_ABORT_UNLESS(RegisterSchemaPreset(fakePreset, db)); - AddPresetVersion(fakePreset.GetId(), version, versionInfo.GetSchema(), db); + AddSchemaVersion(fakePreset.GetId(), version, versionInfo.GetSchema(), db); } else { Y_ABORT_UNLESS(SchemaPresets.contains(fakePreset.GetId())); - AddPresetVersion(fakePreset.GetId(), version, versionInfo.GetSchema(), db); + AddSchemaVersion(fakePreset.GetId(), version, versionInfo.GetSchema(), db); } } @@ -267,7 +267,6 @@ void TTablesManager::AddTableVersion(const ui64 pathId, const TRowVersion& versi if (PrimaryIndex) { PrimaryIndex->OnTieringModified(nullptr, Ttl); } - } Schema::SaveTableVersionInfo(db, pathId, version, versionInfo); table.AddVersion(version, versionInfo); @@ -280,12 +279,8 @@ void TTablesManager::IndexSchemaVersion(const TRowVersion& version, const NKikim const bool isFirstPrimaryIndexInitialization = !PrimaryIndex; if (!PrimaryIndex) { PrimaryIndex = std::make_unique<NOlap::TColumnEngineForLogs>(TabletId, NOlap::TCompactionLimits(), StoragesManager); - } else { - const NOlap::TIndexInfo& lastIndexInfo = PrimaryIndex->GetVersionedIndex().GetLastSchema()->GetIndexInfo(); - Y_ABORT_UNLESS(lastIndexInfo.GetReplaceKey()->Equals(indexInfo.GetReplaceKey())); - Y_ABORT_UNLESS(lastIndexInfo.GetIndexKey()->Equals(indexInfo.GetIndexKey())); } - PrimaryIndex->UpdateDefaultSchema(snapshot, std::move(indexInfo)); + PrimaryIndex->RegisterSchemaVersion(snapshot, std::move(indexInfo)); if (isFirstPrimaryIndexInitialization) { for (auto&& i : Tables) { PrimaryIndex->RegisterTable(i.first); diff --git a/ydb/core/tx/columnshard/tables_manager.h b/ydb/core/tx/columnshard/tables_manager.h index 420bbb6a29..4642389721 100644 --- a/ydb/core/tx/columnshard/tables_manager.h +++ b/ydb/core/tx/columnshard/tables_manager.h @@ -208,7 +208,7 @@ public: void RegisterTable(TTableInfo&& table, NIceDb::TNiceDb& db); bool RegisterSchemaPreset(const TSchemaPreset& schemaPreset, NIceDb::TNiceDb& db); - void AddPresetVersion(const ui32 presetId, const TRowVersion& version, const NKikimrSchemeOp::TColumnTableSchema& schema, NIceDb::TNiceDb& db); + void AddSchemaVersion(const ui32 presetId, const TRowVersion& version, const NKikimrSchemeOp::TColumnTableSchema& schema, NIceDb::TNiceDb& db); void AddTableVersion(const ui64 pathId, const TRowVersion& version, const TTableInfo::TTableVersionInfo& versionInfo, NIceDb::TNiceDb& db); private: void IndexSchemaVersion(const TRowVersion& version, const NKikimrSchemeOp::TColumnTableSchema& schema); |