summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authornsofya <[email protected]>2023-10-03 19:28:14 +0300
committernsofya <[email protected]>2023-10-03 20:00:48 +0300
commit4edcebfa42188a586a72b63aa2b7da7cc6cba9b1 (patch)
treeccf6fe87935dff70f211c22d05ede0be7b8a9dc7
parentc850b4710e1c140fc5bbe23670ee03964f08987a (diff)
KIKIMR-19263: Use schema version instead of schema snapshot
-rw-r--r--ydb/core/tx/columnshard/blobs_action/transaction/tx_write.cpp2
-rw-r--r--ydb/core/tx/columnshard/columnshard_schema.cpp8
-rw-r--r--ydb/core/tx/columnshard/columnshard_schema.h7
-rw-r--r--ydb/core/tx/columnshard/engines/changes/indexation.cpp2
-rw-r--r--ydb/core/tx/columnshard/engines/insert_table/data.cpp3
-rw-r--r--ydb/core/tx/columnshard/engines/insert_table/data.h24
-rw-r--r--ydb/core/tx/columnshard/engines/insert_table/insert_table.cpp2
-rw-r--r--ydb/core/tx/columnshard/engines/reader/plain_reader/committed_assembler.cpp4
-rw-r--r--ydb/core/tx/columnshard/engines/reader/plain_reader/committed_assembler.h2
-rw-r--r--ydb/core/tx/columnshard/engines/reader/read_metadata.h2
-rw-r--r--ydb/core/tx/columnshard/engines/ut_insert_table.cpp2
-rw-r--r--ydb/core/tx/columnshard/engines/ut_logs_engine.cpp12
12 files changed, 28 insertions, 42 deletions
diff --git a/ydb/core/tx/columnshard/blobs_action/transaction/tx_write.cpp b/ydb/core/tx/columnshard/blobs_action/transaction/tx_write.cpp
index 56b9de2a0f6..1c6db661dcb 100644
--- a/ydb/core/tx/columnshard/blobs_action/transaction/tx_write.cpp
+++ b/ydb/core/tx/columnshard/blobs_action/transaction/tx_write.cpp
@@ -15,7 +15,7 @@ bool TTxWrite::InsertOneBlob(TTransactionContext& txc, const TEvPrivate::TEvWrit
auto tableSchema = Self->TablesManager.GetPrimaryIndex()->GetVersionedIndex().GetSchemaUnsafe(PutBlobResult->Get()->GetSchemaVersion());
- NOlap::TInsertedData insertData((ui64)writeId, writeMeta.GetTableId(), writeMeta.GetDedupId(), blobRange, meta, tableSchema->GetSnapshot(), blob);
+ NOlap::TInsertedData insertData((ui64)writeId, writeMeta.GetTableId(), writeMeta.GetDedupId(), blobRange, meta, tableSchema->GetVersion(), blob);
bool ok = Self->InsertTable->Insert(dbTable, std::move(insertData));
if (ok) {
// Put new data into blob cache
diff --git a/ydb/core/tx/columnshard/columnshard_schema.cpp b/ydb/core/tx/columnshard/columnshard_schema.cpp
index 980b9938ad0..7ff8a2996c7 100644
--- a/ydb/core/tx/columnshard/columnshard_schema.cpp
+++ b/ydb/core/tx/columnshard/columnshard_schema.cpp
@@ -39,11 +39,7 @@ bool Schema::InsertTable_Load(NIceDb::TNiceDb& db, const IBlobGroupSelector* dsG
TString dedupId = rowset.GetValue<InsertTable::DedupId>();
TString strBlobId = rowset.GetValue<InsertTable::BlobId>();
TString metaStr = rowset.GetValue<InsertTable::Meta>();
-
- Y_VERIFY(rowset.HaveValue<InsertTable::IndexPlanStep>());
- ui64 indexPlanStep = rowset.GetValue<InsertTable::IndexPlanStep>();
- ui64 indexTxId = rowset.GetValue<InsertTable::IndexTxId>();
- const NOlap::TSnapshot indexSnapshot(indexPlanStep, indexTxId);
+ ui64 schemaVersion = rowset.HaveValue<InsertTable::SchemaVersion>() ? rowset.GetValue<InsertTable::SchemaVersion>() : 0;
TString error;
NOlap::TUnifiedBlobId blobId = NOlap::TUnifiedBlobId::ParseFromString(strBlobId, dsGroupSelector, error);
@@ -53,7 +49,7 @@ bool Schema::InsertTable_Load(NIceDb::TNiceDb& db, const IBlobGroupSelector* dsG
if (metaStr) {
Y_VERIFY(meta.ParseFromString(metaStr));
}
- TInsertedData data(planStep, writeTxId, pathId, dedupId, NOlap::TBlobRange(blobId, 0, blobId.BlobSize()), meta, indexSnapshot, {});
+ TInsertedData data(planStep, writeTxId, pathId, dedupId, NOlap::TBlobRange(blobId, 0, blobId.BlobSize()), meta, schemaVersion, {});
switch (recType) {
case EInsertTableIds::Inserted:
diff --git a/ydb/core/tx/columnshard/columnshard_schema.h b/ydb/core/tx/columnshard/columnshard_schema.h
index 4910ce98117..bbf838f6eb6 100644
--- a/ydb/core/tx/columnshard/columnshard_schema.h
+++ b/ydb/core/tx/columnshard/columnshard_schema.h
@@ -212,9 +212,10 @@ struct Schema : NIceDb::Schema {
struct Meta : Column<7, NScheme::NTypeIds::String> {};
struct IndexPlanStep : Column<8, NScheme::NTypeIds::Uint64> {};
struct IndexTxId : Column<9, NScheme::NTypeIds::Uint64> {};
+ struct SchemaVersion : Column<10, NScheme::NTypeIds::Uint64> {};
using TKey = TableKey<Committed, PlanStep, WriteTxId, PathId, DedupId>;
- using TColumns = TableColumns<Committed, PlanStep, WriteTxId, PathId, DedupId, BlobId, Meta, IndexPlanStep, IndexTxId>;
+ using TColumns = TableColumns<Committed, PlanStep, WriteTxId, PathId, DedupId, BlobId, Meta, IndexPlanStep, IndexTxId, SchemaVersion>;
};
struct IndexGranules : NIceDb::Schema::Table<GranulesTableId> {
@@ -463,12 +464,10 @@ struct Schema : NIceDb::Schema {
// InsertTable activities
static void InsertTable_Upsert(NIceDb::TNiceDb& db, EInsertTableIds recType, const TInsertedData& data) {
- Y_VERIFY(data.GetSchemaSnapshot().Valid());
db.Table<InsertTable>().Key((ui8)recType, data.PlanStep, data.WriteTxId, data.PathId, data.DedupId).Update(
NIceDb::TUpdate<InsertTable::BlobId>(data.GetBlobRange().GetBlobId().ToStringLegacy()),
NIceDb::TUpdate<InsertTable::Meta>(data.GetMeta().SerializeToProto().SerializeAsString()),
- NIceDb::TUpdate<InsertTable::IndexPlanStep>(data.GetSchemaSnapshot().GetPlanStep()),
- NIceDb::TUpdate<InsertTable::IndexTxId>(data.GetSchemaSnapshot().GetTxId())
+ NIceDb::TUpdate<InsertTable::SchemaVersion>(data.GetSchemaVersion())
);
}
diff --git a/ydb/core/tx/columnshard/engines/changes/indexation.cpp b/ydb/core/tx/columnshard/engines/changes/indexation.cpp
index c8ed28bfd5f..2d3be5d8839 100644
--- a/ydb/core/tx/columnshard/engines/changes/indexation.cpp
+++ b/ydb/core/tx/columnshard/engines/changes/indexation.cpp
@@ -85,7 +85,7 @@ TConclusionStatus TInsertColumnEngineChanges::DoConstructBlobs(TConstructionCont
for (auto& inserted : DataToIndex) {
const TBlobRange& blobRange = inserted.GetBlobRange();
- auto blobSchema = context.SchemaVersions.GetSchema(inserted.GetSchemaSnapshot());
+ auto blobSchema = context.SchemaVersions.GetSchema(inserted.GetSchemaVersion());
auto& indexInfo = blobSchema->GetIndexInfo();
Y_VERIFY(indexInfo.IsSorted());
diff --git a/ydb/core/tx/columnshard/engines/insert_table/data.cpp b/ydb/core/tx/columnshard/engines/insert_table/data.cpp
index 18f41acfdc9..f1f6c78e5e2 100644
--- a/ydb/core/tx/columnshard/engines/insert_table/data.cpp
+++ b/ydb/core/tx/columnshard/engines/insert_table/data.cpp
@@ -35,7 +35,7 @@ TInsertedData::~TInsertedData() {
}
TInsertedData::TInsertedData(ui64 planStep, ui64 writeTxId, ui64 pathId, TString dedupId, const TBlobRange& blobRange,
- const NKikimrTxColumnShard::TLogicalMetadata& proto, const TSnapshot& schemaVersion,
+ const NKikimrTxColumnShard::TLogicalMetadata& proto, const ui64 schemaVersion,
const std::optional<TString>& blobData /*= {}*/)
: Meta(proto)
, BlobRange(blobRange)
@@ -51,7 +51,6 @@ TInsertedData::TInsertedData(ui64 planStep, ui64 writeTxId, ui64 pathId, TString
BlobDataGuard = std::make_shared<TBlobStorageGuard>(*blobData);
}
}
- Y_VERIFY(SchemaVersion.Valid());
}
}
diff --git a/ydb/core/tx/columnshard/engines/insert_table/data.h b/ydb/core/tx/columnshard/engines/insert_table/data.h
index d891fbca17e..4842b27baef 100644
--- a/ydb/core/tx/columnshard/engines/insert_table/data.h
+++ b/ydb/core/tx/columnshard/engines/insert_table/data.h
@@ -28,8 +28,9 @@ public:
ui64 WriteTxId = 0;
ui64 PathId = 0;
TString DedupId;
+
private:
- TSnapshot SchemaVersion = TSnapshot::Zero();
+ YDB_READONLY(ui64, SchemaVersion, 0);
public:
std::optional<TString> GetBlobData() const {
if (BlobDataGuard) {
@@ -46,16 +47,15 @@ public:
TInsertedData() = delete; // avoid invalid TInsertedData anywhere
TInsertedData(ui64 planStep, ui64 writeTxId, ui64 pathId, TString dedupId, const TBlobRange& blobRange,
- const NKikimrTxColumnShard::TLogicalMetadata& proto, const TSnapshot& schemaVersion, const std::optional<TString>& blobData);
+ const NKikimrTxColumnShard::TLogicalMetadata& proto, const ui64 schemaVersion, const std::optional<TString>& blobData);
TInsertedData(ui64 writeTxId, ui64 pathId, TString dedupId, const TBlobRange& blobRange,
- const NKikimrTxColumnShard::TLogicalMetadata& proto, const TSnapshot& schemaVersion, const std::optional<TString>& blobData)
+ const NKikimrTxColumnShard::TLogicalMetadata& proto, const ui64 schemaVersion, const std::optional<TString>& blobData)
: TInsertedData(0, writeTxId, pathId, dedupId, blobRange, proto, schemaVersion, blobData)
{}
-
TInsertedData(ui64 writeTxId, ui64 pathId, TString dedupId, const TUnifiedBlobId& blobId,
- const NKikimrTxColumnShard::TLogicalMetadata& proto, const TSnapshot& schemaVersion, const std::optional<TString>& blobData)
+ const NKikimrTxColumnShard::TLogicalMetadata& proto, const ui64 schemaVersion, const std::optional<TString>& blobData)
: TInsertedData(0, writeTxId, pathId, dedupId, TBlobRange(blobId, 0, blobId.BlobSize()), proto, schemaVersion, blobData)
{
}
@@ -118,10 +118,6 @@ public:
return TSnapshot(PlanStep, WriteTxId);
}
- const TSnapshot& GetSchemaSnapshot() const {
- return SchemaVersion;
- }
-
ui32 BlobSize() const { return BlobRange.GetBlobSize(); }
};
@@ -130,7 +126,7 @@ class TCommittedBlob {
private:
TBlobRange BlobRange;
TSnapshot CommitSnapshot;
- TSnapshot SchemaSnapshot;
+ YDB_READONLY_DEF(ui64, SchemaVersion);
YDB_READONLY_DEF(std::optional<NArrow::TReplaceKey>, First);
YDB_READONLY_DEF(std::optional<NArrow::TReplaceKey>, Last);
public:
@@ -144,10 +140,10 @@ public:
return *Last;
}
- TCommittedBlob(const TBlobRange& blobRange, const TSnapshot& snapshot, const TSnapshot& schemaSnapshot, const std::optional<NArrow::TReplaceKey>& first, const std::optional<NArrow::TReplaceKey>& last)
+ TCommittedBlob(const TBlobRange& blobRange, const TSnapshot& snapshot, const ui64 schemaVersion, const std::optional<NArrow::TReplaceKey>& first, const std::optional<NArrow::TReplaceKey>& last)
: BlobRange(blobRange)
, CommitSnapshot(snapshot)
- , SchemaSnapshot(schemaSnapshot)
+ , SchemaVersion(schemaVersion)
, First(first)
, Last(last)
{}
@@ -164,10 +160,6 @@ public:
return CommitSnapshot;
}
- const TSnapshot& GetSchemaSnapshot() const {
- return SchemaSnapshot;
- }
-
const TBlobRange& GetBlobRange() const {
return BlobRange;
}
diff --git a/ydb/core/tx/columnshard/engines/insert_table/insert_table.cpp b/ydb/core/tx/columnshard/engines/insert_table/insert_table.cpp
index 87a15a7d3bb..8bcbfb38df9 100644
--- a/ydb/core/tx/columnshard/engines/insert_table/insert_table.cpp
+++ b/ydb/core/tx/columnshard/engines/insert_table/insert_table.cpp
@@ -126,7 +126,7 @@ std::vector<TCommittedBlob> TInsertTable::Read(ui64 pathId, const TSnapshot& sna
std::vector<TCommittedBlob> result;
result.reserve(ret.size());
for (auto&& i : ret) {
- result.emplace_back(TCommittedBlob(i->GetBlobRange(), i->GetSnapshot(), i->GetSchemaSnapshot(), i->GetMeta().GetMin(pkSchema), i->GetMeta().GetMax(pkSchema)));
+ result.emplace_back(TCommittedBlob(i->GetBlobRange(), i->GetSnapshot(), i->GetSchemaVersion(), i->GetMeta().GetMin(pkSchema), i->GetMeta().GetMax(pkSchema)));
}
return result;
diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/committed_assembler.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/committed_assembler.cpp
index fb72af0d520..a424d87cad4 100644
--- a/ydb/core/tx/columnshard/engines/reader/plain_reader/committed_assembler.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/committed_assembler.cpp
@@ -4,7 +4,7 @@
namespace NKikimr::NOlap::NPlainReader {
bool TCommittedAssembler::DoExecute() {
- ResultBatch = NArrow::DeserializeBatch(BlobData, ReadMetadata->GetBlobSchema(SchemaSnapshot));
+ ResultBatch = NArrow::DeserializeBatch(BlobData, ReadMetadata->GetBlobSchema(SchemaVersion));
Y_VERIFY(ResultBatch);
ResultBatch = ReadMetadata->GetIndexInfo().AddSpecialColumns(ResultBatch, DataSnapshot);
Y_VERIFY(ResultBatch);
@@ -26,7 +26,7 @@ TCommittedAssembler::TCommittedAssembler(const NActors::TActorId& scanActorId, c
, BlobData(blobData)
, ReadMetadata(readMetadata)
, SourceIdx(sourceIdx)
- , SchemaSnapshot(cBlob.GetSchemaSnapshot())
+ , SchemaVersion(cBlob.GetSchemaVersion())
, DataSnapshot(cBlob.GetSnapshot())
{
}
diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/committed_assembler.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/committed_assembler.h
index 2dc0fdff17a..f0b00d990c3 100644
--- a/ydb/core/tx/columnshard/engines/reader/plain_reader/committed_assembler.h
+++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/committed_assembler.h
@@ -13,7 +13,7 @@ private:
TString BlobData;
TReadMetadata::TConstPtr ReadMetadata;
const ui32 SourceIdx;
- TSnapshot SchemaSnapshot;
+ ui64 SchemaVersion;
TSnapshot DataSnapshot;
std::shared_ptr<NArrow::TColumnFilter> EarlyFilter;
diff --git a/ydb/core/tx/columnshard/engines/reader/read_metadata.h b/ydb/core/tx/columnshard/engines/reader/read_metadata.h
index 5ba8e09235c..d4fbbdb6a74 100644
--- a/ydb/core/tx/columnshard/engines/reader/read_metadata.h
+++ b/ydb/core/tx/columnshard/engines/reader/read_metadata.h
@@ -189,7 +189,7 @@ public:
return it->second;
}
- std::shared_ptr<arrow::Schema> GetBlobSchema(const TSnapshot& version) const {
+ std::shared_ptr<arrow::Schema> GetBlobSchema(const ui64 version) const {
return IndexVersions.GetSchema(version)->GetIndexInfo().ArrowSchema();
}
diff --git a/ydb/core/tx/columnshard/engines/ut_insert_table.cpp b/ydb/core/tx/columnshard/engines/ut_insert_table.cpp
index b14b5b6a18a..fdb9d385931 100644
--- a/ydb/core/tx/columnshard/engines/ut_insert_table.cpp
+++ b/ydb/core/tx/columnshard/engines/ut_insert_table.cpp
@@ -51,7 +51,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestInsertTable) {
TTestInsertTableDB dbTable;
TInsertTable insertTable;
- TSnapshot indexSnapshot(1, 1);
+ ui64 indexSnapshot = 0;
// insert, not commited
bool ok = insertTable.Insert(dbTable, TInsertedData(writeId, tableId, dedupId, blobId1, {}, indexSnapshot, {}));
diff --git a/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp b/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp
index 379b43177b1..bdeface567a 100644
--- a/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp
+++ b/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp
@@ -442,8 +442,8 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
engine.Load(db, lostBlobs);
std::vector<TInsertedData> dataToIndex = {
- TInsertedData(2, paths[0], "", blobRanges[0].BlobId, {}, indexSnaphot, {}),
- TInsertedData(1, paths[0], "", blobRanges[1].BlobId, {}, indexSnaphot, {})
+ TInsertedData(2, paths[0], "", blobRanges[0].BlobId, {}, 0, {}),
+ TInsertedData(1, paths[0], "", blobRanges[1].BlobId, {}, 0, {})
};
// write
@@ -538,7 +538,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
// PlanStep, TxId, PathId, DedupId, BlobId, Data, [Metadata]
std::vector<TInsertedData> dataToIndex;
dataToIndex.push_back(
- TInsertedData(txId, pathId, "", blobRange.BlobId, {}, indexSnapshot, {}));
+ TInsertedData(txId, pathId, "", blobRange.BlobId, {}, 0, {}));
bool ok = Insert(engine, db, TSnapshot(planStep, txId), std::move(dataToIndex), blobs, step);
UNIT_ASSERT(ok);
@@ -634,7 +634,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
// PlanStep, TxId, PathId, DedupId, BlobId, Data, [Metadata]
std::vector<TInsertedData> dataToIndex;
dataToIndex.push_back(
- TInsertedData(txId, pathId, "", blobRange.BlobId, {}, indexSnapshot, {}));
+ TInsertedData(txId, pathId, "", blobRange.BlobId, {}, 0, {}));
bool ok = Insert(engine, db, TSnapshot(planStep, txId), std::move(dataToIndex), blobs, step);
UNIT_ASSERT(ok);
@@ -663,7 +663,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
// PlanStep, TxId, PathId, DedupId, BlobId, Data, [Metadata]
std::vector<TInsertedData> dataToIndex;
dataToIndex.push_back(
- TInsertedData(txId, pathId, "", blobRange.BlobId, {}, indexSnapshot, {}));
+ TInsertedData(txId, pathId, "", blobRange.BlobId, {}, 0, {}));
bool ok = Insert(engine, db, TSnapshot(planStep, txId), std::move(dataToIndex), blobs, step);
UNIT_ASSERT(ok);
@@ -703,7 +703,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
// PlanStep, TxId, PathId, DedupId, BlobId, Data, [Metadata]
std::vector<TInsertedData> dataToIndex;
dataToIndex.push_back(
- TInsertedData(txId, pathId, "", blobRange.BlobId, {}, indexSnapshot, {}));
+ TInsertedData(txId, pathId, "", blobRange.BlobId, {}, 0, {}));
bool ok = Insert(engine, db, TSnapshot(planStep, txId), std::move(dataToIndex), blobs, step);
UNIT_ASSERT(ok);