diff options
author | nsofya <[email protected]> | 2023-10-03 19:28:14 +0300 |
---|---|---|
committer | nsofya <[email protected]> | 2023-10-03 20:00:48 +0300 |
commit | 4edcebfa42188a586a72b63aa2b7da7cc6cba9b1 (patch) | |
tree | ccf6fe87935dff70f211c22d05ede0be7b8a9dc7 | |
parent | c850b4710e1c140fc5bbe23670ee03964f08987a (diff) |
KIKIMR-19263: Use schema version instead of schema snapshot
12 files changed, 28 insertions, 42 deletions
diff --git a/ydb/core/tx/columnshard/blobs_action/transaction/tx_write.cpp b/ydb/core/tx/columnshard/blobs_action/transaction/tx_write.cpp index 56b9de2a0f6..1c6db661dcb 100644 --- a/ydb/core/tx/columnshard/blobs_action/transaction/tx_write.cpp +++ b/ydb/core/tx/columnshard/blobs_action/transaction/tx_write.cpp @@ -15,7 +15,7 @@ bool TTxWrite::InsertOneBlob(TTransactionContext& txc, const TEvPrivate::TEvWrit auto tableSchema = Self->TablesManager.GetPrimaryIndex()->GetVersionedIndex().GetSchemaUnsafe(PutBlobResult->Get()->GetSchemaVersion()); - NOlap::TInsertedData insertData((ui64)writeId, writeMeta.GetTableId(), writeMeta.GetDedupId(), blobRange, meta, tableSchema->GetSnapshot(), blob); + NOlap::TInsertedData insertData((ui64)writeId, writeMeta.GetTableId(), writeMeta.GetDedupId(), blobRange, meta, tableSchema->GetVersion(), blob); bool ok = Self->InsertTable->Insert(dbTable, std::move(insertData)); if (ok) { // Put new data into blob cache diff --git a/ydb/core/tx/columnshard/columnshard_schema.cpp b/ydb/core/tx/columnshard/columnshard_schema.cpp index 980b9938ad0..7ff8a2996c7 100644 --- a/ydb/core/tx/columnshard/columnshard_schema.cpp +++ b/ydb/core/tx/columnshard/columnshard_schema.cpp @@ -39,11 +39,7 @@ bool Schema::InsertTable_Load(NIceDb::TNiceDb& db, const IBlobGroupSelector* dsG TString dedupId = rowset.GetValue<InsertTable::DedupId>(); TString strBlobId = rowset.GetValue<InsertTable::BlobId>(); TString metaStr = rowset.GetValue<InsertTable::Meta>(); - - Y_VERIFY(rowset.HaveValue<InsertTable::IndexPlanStep>()); - ui64 indexPlanStep = rowset.GetValue<InsertTable::IndexPlanStep>(); - ui64 indexTxId = rowset.GetValue<InsertTable::IndexTxId>(); - const NOlap::TSnapshot indexSnapshot(indexPlanStep, indexTxId); + ui64 schemaVersion = rowset.HaveValue<InsertTable::SchemaVersion>() ? rowset.GetValue<InsertTable::SchemaVersion>() : 0; TString error; NOlap::TUnifiedBlobId blobId = NOlap::TUnifiedBlobId::ParseFromString(strBlobId, dsGroupSelector, error); @@ -53,7 +49,7 @@ bool Schema::InsertTable_Load(NIceDb::TNiceDb& db, const IBlobGroupSelector* dsG if (metaStr) { Y_VERIFY(meta.ParseFromString(metaStr)); } - TInsertedData data(planStep, writeTxId, pathId, dedupId, NOlap::TBlobRange(blobId, 0, blobId.BlobSize()), meta, indexSnapshot, {}); + TInsertedData data(planStep, writeTxId, pathId, dedupId, NOlap::TBlobRange(blobId, 0, blobId.BlobSize()), meta, schemaVersion, {}); switch (recType) { case EInsertTableIds::Inserted: diff --git a/ydb/core/tx/columnshard/columnshard_schema.h b/ydb/core/tx/columnshard/columnshard_schema.h index 4910ce98117..bbf838f6eb6 100644 --- a/ydb/core/tx/columnshard/columnshard_schema.h +++ b/ydb/core/tx/columnshard/columnshard_schema.h @@ -212,9 +212,10 @@ struct Schema : NIceDb::Schema { struct Meta : Column<7, NScheme::NTypeIds::String> {}; struct IndexPlanStep : Column<8, NScheme::NTypeIds::Uint64> {}; struct IndexTxId : Column<9, NScheme::NTypeIds::Uint64> {}; + struct SchemaVersion : Column<10, NScheme::NTypeIds::Uint64> {}; using TKey = TableKey<Committed, PlanStep, WriteTxId, PathId, DedupId>; - using TColumns = TableColumns<Committed, PlanStep, WriteTxId, PathId, DedupId, BlobId, Meta, IndexPlanStep, IndexTxId>; + using TColumns = TableColumns<Committed, PlanStep, WriteTxId, PathId, DedupId, BlobId, Meta, IndexPlanStep, IndexTxId, SchemaVersion>; }; struct IndexGranules : NIceDb::Schema::Table<GranulesTableId> { @@ -463,12 +464,10 @@ struct Schema : NIceDb::Schema { // InsertTable activities static void InsertTable_Upsert(NIceDb::TNiceDb& db, EInsertTableIds recType, const TInsertedData& data) { - Y_VERIFY(data.GetSchemaSnapshot().Valid()); db.Table<InsertTable>().Key((ui8)recType, data.PlanStep, data.WriteTxId, data.PathId, data.DedupId).Update( NIceDb::TUpdate<InsertTable::BlobId>(data.GetBlobRange().GetBlobId().ToStringLegacy()), NIceDb::TUpdate<InsertTable::Meta>(data.GetMeta().SerializeToProto().SerializeAsString()), - NIceDb::TUpdate<InsertTable::IndexPlanStep>(data.GetSchemaSnapshot().GetPlanStep()), - NIceDb::TUpdate<InsertTable::IndexTxId>(data.GetSchemaSnapshot().GetTxId()) + NIceDb::TUpdate<InsertTable::SchemaVersion>(data.GetSchemaVersion()) ); } diff --git a/ydb/core/tx/columnshard/engines/changes/indexation.cpp b/ydb/core/tx/columnshard/engines/changes/indexation.cpp index c8ed28bfd5f..2d3be5d8839 100644 --- a/ydb/core/tx/columnshard/engines/changes/indexation.cpp +++ b/ydb/core/tx/columnshard/engines/changes/indexation.cpp @@ -85,7 +85,7 @@ TConclusionStatus TInsertColumnEngineChanges::DoConstructBlobs(TConstructionCont for (auto& inserted : DataToIndex) { const TBlobRange& blobRange = inserted.GetBlobRange(); - auto blobSchema = context.SchemaVersions.GetSchema(inserted.GetSchemaSnapshot()); + auto blobSchema = context.SchemaVersions.GetSchema(inserted.GetSchemaVersion()); auto& indexInfo = blobSchema->GetIndexInfo(); Y_VERIFY(indexInfo.IsSorted()); diff --git a/ydb/core/tx/columnshard/engines/insert_table/data.cpp b/ydb/core/tx/columnshard/engines/insert_table/data.cpp index 18f41acfdc9..f1f6c78e5e2 100644 --- a/ydb/core/tx/columnshard/engines/insert_table/data.cpp +++ b/ydb/core/tx/columnshard/engines/insert_table/data.cpp @@ -35,7 +35,7 @@ TInsertedData::~TInsertedData() { } TInsertedData::TInsertedData(ui64 planStep, ui64 writeTxId, ui64 pathId, TString dedupId, const TBlobRange& blobRange, - const NKikimrTxColumnShard::TLogicalMetadata& proto, const TSnapshot& schemaVersion, + const NKikimrTxColumnShard::TLogicalMetadata& proto, const ui64 schemaVersion, const std::optional<TString>& blobData /*= {}*/) : Meta(proto) , BlobRange(blobRange) @@ -51,7 +51,6 @@ TInsertedData::TInsertedData(ui64 planStep, ui64 writeTxId, ui64 pathId, TString BlobDataGuard = std::make_shared<TBlobStorageGuard>(*blobData); } } - Y_VERIFY(SchemaVersion.Valid()); } } diff --git a/ydb/core/tx/columnshard/engines/insert_table/data.h b/ydb/core/tx/columnshard/engines/insert_table/data.h index d891fbca17e..4842b27baef 100644 --- a/ydb/core/tx/columnshard/engines/insert_table/data.h +++ b/ydb/core/tx/columnshard/engines/insert_table/data.h @@ -28,8 +28,9 @@ public: ui64 WriteTxId = 0; ui64 PathId = 0; TString DedupId; + private: - TSnapshot SchemaVersion = TSnapshot::Zero(); + YDB_READONLY(ui64, SchemaVersion, 0); public: std::optional<TString> GetBlobData() const { if (BlobDataGuard) { @@ -46,16 +47,15 @@ public: TInsertedData() = delete; // avoid invalid TInsertedData anywhere TInsertedData(ui64 planStep, ui64 writeTxId, ui64 pathId, TString dedupId, const TBlobRange& blobRange, - const NKikimrTxColumnShard::TLogicalMetadata& proto, const TSnapshot& schemaVersion, const std::optional<TString>& blobData); + const NKikimrTxColumnShard::TLogicalMetadata& proto, const ui64 schemaVersion, const std::optional<TString>& blobData); TInsertedData(ui64 writeTxId, ui64 pathId, TString dedupId, const TBlobRange& blobRange, - const NKikimrTxColumnShard::TLogicalMetadata& proto, const TSnapshot& schemaVersion, const std::optional<TString>& blobData) + const NKikimrTxColumnShard::TLogicalMetadata& proto, const ui64 schemaVersion, const std::optional<TString>& blobData) : TInsertedData(0, writeTxId, pathId, dedupId, blobRange, proto, schemaVersion, blobData) {} - TInsertedData(ui64 writeTxId, ui64 pathId, TString dedupId, const TUnifiedBlobId& blobId, - const NKikimrTxColumnShard::TLogicalMetadata& proto, const TSnapshot& schemaVersion, const std::optional<TString>& blobData) + const NKikimrTxColumnShard::TLogicalMetadata& proto, const ui64 schemaVersion, const std::optional<TString>& blobData) : TInsertedData(0, writeTxId, pathId, dedupId, TBlobRange(blobId, 0, blobId.BlobSize()), proto, schemaVersion, blobData) { } @@ -118,10 +118,6 @@ public: return TSnapshot(PlanStep, WriteTxId); } - const TSnapshot& GetSchemaSnapshot() const { - return SchemaVersion; - } - ui32 BlobSize() const { return BlobRange.GetBlobSize(); } }; @@ -130,7 +126,7 @@ class TCommittedBlob { private: TBlobRange BlobRange; TSnapshot CommitSnapshot; - TSnapshot SchemaSnapshot; + YDB_READONLY_DEF(ui64, SchemaVersion); YDB_READONLY_DEF(std::optional<NArrow::TReplaceKey>, First); YDB_READONLY_DEF(std::optional<NArrow::TReplaceKey>, Last); public: @@ -144,10 +140,10 @@ public: return *Last; } - TCommittedBlob(const TBlobRange& blobRange, const TSnapshot& snapshot, const TSnapshot& schemaSnapshot, const std::optional<NArrow::TReplaceKey>& first, const std::optional<NArrow::TReplaceKey>& last) + TCommittedBlob(const TBlobRange& blobRange, const TSnapshot& snapshot, const ui64 schemaVersion, const std::optional<NArrow::TReplaceKey>& first, const std::optional<NArrow::TReplaceKey>& last) : BlobRange(blobRange) , CommitSnapshot(snapshot) - , SchemaSnapshot(schemaSnapshot) + , SchemaVersion(schemaVersion) , First(first) , Last(last) {} @@ -164,10 +160,6 @@ public: return CommitSnapshot; } - const TSnapshot& GetSchemaSnapshot() const { - return SchemaSnapshot; - } - const TBlobRange& GetBlobRange() const { return BlobRange; } diff --git a/ydb/core/tx/columnshard/engines/insert_table/insert_table.cpp b/ydb/core/tx/columnshard/engines/insert_table/insert_table.cpp index 87a15a7d3bb..8bcbfb38df9 100644 --- a/ydb/core/tx/columnshard/engines/insert_table/insert_table.cpp +++ b/ydb/core/tx/columnshard/engines/insert_table/insert_table.cpp @@ -126,7 +126,7 @@ std::vector<TCommittedBlob> TInsertTable::Read(ui64 pathId, const TSnapshot& sna std::vector<TCommittedBlob> result; result.reserve(ret.size()); for (auto&& i : ret) { - result.emplace_back(TCommittedBlob(i->GetBlobRange(), i->GetSnapshot(), i->GetSchemaSnapshot(), i->GetMeta().GetMin(pkSchema), i->GetMeta().GetMax(pkSchema))); + result.emplace_back(TCommittedBlob(i->GetBlobRange(), i->GetSnapshot(), i->GetSchemaVersion(), i->GetMeta().GetMin(pkSchema), i->GetMeta().GetMax(pkSchema))); } return result; diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/committed_assembler.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/committed_assembler.cpp index fb72af0d520..a424d87cad4 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/committed_assembler.cpp +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/committed_assembler.cpp @@ -4,7 +4,7 @@ namespace NKikimr::NOlap::NPlainReader { bool TCommittedAssembler::DoExecute() { - ResultBatch = NArrow::DeserializeBatch(BlobData, ReadMetadata->GetBlobSchema(SchemaSnapshot)); + ResultBatch = NArrow::DeserializeBatch(BlobData, ReadMetadata->GetBlobSchema(SchemaVersion)); Y_VERIFY(ResultBatch); ResultBatch = ReadMetadata->GetIndexInfo().AddSpecialColumns(ResultBatch, DataSnapshot); Y_VERIFY(ResultBatch); @@ -26,7 +26,7 @@ TCommittedAssembler::TCommittedAssembler(const NActors::TActorId& scanActorId, c , BlobData(blobData) , ReadMetadata(readMetadata) , SourceIdx(sourceIdx) - , SchemaSnapshot(cBlob.GetSchemaSnapshot()) + , SchemaVersion(cBlob.GetSchemaVersion()) , DataSnapshot(cBlob.GetSnapshot()) { } diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/committed_assembler.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/committed_assembler.h index 2dc0fdff17a..f0b00d990c3 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/committed_assembler.h +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/committed_assembler.h @@ -13,7 +13,7 @@ private: TString BlobData; TReadMetadata::TConstPtr ReadMetadata; const ui32 SourceIdx; - TSnapshot SchemaSnapshot; + ui64 SchemaVersion; TSnapshot DataSnapshot; std::shared_ptr<NArrow::TColumnFilter> EarlyFilter; diff --git a/ydb/core/tx/columnshard/engines/reader/read_metadata.h b/ydb/core/tx/columnshard/engines/reader/read_metadata.h index 5ba8e09235c..d4fbbdb6a74 100644 --- a/ydb/core/tx/columnshard/engines/reader/read_metadata.h +++ b/ydb/core/tx/columnshard/engines/reader/read_metadata.h @@ -189,7 +189,7 @@ public: return it->second; } - std::shared_ptr<arrow::Schema> GetBlobSchema(const TSnapshot& version) const { + std::shared_ptr<arrow::Schema> GetBlobSchema(const ui64 version) const { return IndexVersions.GetSchema(version)->GetIndexInfo().ArrowSchema(); } diff --git a/ydb/core/tx/columnshard/engines/ut_insert_table.cpp b/ydb/core/tx/columnshard/engines/ut_insert_table.cpp index b14b5b6a18a..fdb9d385931 100644 --- a/ydb/core/tx/columnshard/engines/ut_insert_table.cpp +++ b/ydb/core/tx/columnshard/engines/ut_insert_table.cpp @@ -51,7 +51,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestInsertTable) { TTestInsertTableDB dbTable; TInsertTable insertTable; - TSnapshot indexSnapshot(1, 1); + ui64 indexSnapshot = 0; // insert, not commited bool ok = insertTable.Insert(dbTable, TInsertedData(writeId, tableId, dedupId, blobId1, {}, indexSnapshot, {})); diff --git a/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp b/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp index 379b43177b1..bdeface567a 100644 --- a/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp +++ b/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp @@ -442,8 +442,8 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) { engine.Load(db, lostBlobs); std::vector<TInsertedData> dataToIndex = { - TInsertedData(2, paths[0], "", blobRanges[0].BlobId, {}, indexSnaphot, {}), - TInsertedData(1, paths[0], "", blobRanges[1].BlobId, {}, indexSnaphot, {}) + TInsertedData(2, paths[0], "", blobRanges[0].BlobId, {}, 0, {}), + TInsertedData(1, paths[0], "", blobRanges[1].BlobId, {}, 0, {}) }; // write @@ -538,7 +538,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) { // PlanStep, TxId, PathId, DedupId, BlobId, Data, [Metadata] std::vector<TInsertedData> dataToIndex; dataToIndex.push_back( - TInsertedData(txId, pathId, "", blobRange.BlobId, {}, indexSnapshot, {})); + TInsertedData(txId, pathId, "", blobRange.BlobId, {}, 0, {})); bool ok = Insert(engine, db, TSnapshot(planStep, txId), std::move(dataToIndex), blobs, step); UNIT_ASSERT(ok); @@ -634,7 +634,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) { // PlanStep, TxId, PathId, DedupId, BlobId, Data, [Metadata] std::vector<TInsertedData> dataToIndex; dataToIndex.push_back( - TInsertedData(txId, pathId, "", blobRange.BlobId, {}, indexSnapshot, {})); + TInsertedData(txId, pathId, "", blobRange.BlobId, {}, 0, {})); bool ok = Insert(engine, db, TSnapshot(planStep, txId), std::move(dataToIndex), blobs, step); UNIT_ASSERT(ok); @@ -663,7 +663,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) { // PlanStep, TxId, PathId, DedupId, BlobId, Data, [Metadata] std::vector<TInsertedData> dataToIndex; dataToIndex.push_back( - TInsertedData(txId, pathId, "", blobRange.BlobId, {}, indexSnapshot, {})); + TInsertedData(txId, pathId, "", blobRange.BlobId, {}, 0, {})); bool ok = Insert(engine, db, TSnapshot(planStep, txId), std::move(dataToIndex), blobs, step); UNIT_ASSERT(ok); @@ -703,7 +703,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) { // PlanStep, TxId, PathId, DedupId, BlobId, Data, [Metadata] std::vector<TInsertedData> dataToIndex; dataToIndex.push_back( - TInsertedData(txId, pathId, "", blobRange.BlobId, {}, indexSnapshot, {})); + TInsertedData(txId, pathId, "", blobRange.BlobId, {}, 0, {})); bool ok = Insert(engine, db, TSnapshot(planStep, txId), std::move(dataToIndex), blobs, step); UNIT_ASSERT(ok); |