aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorIlnaz Nizametdinov <i.nizametdinov@gmail.com>2022-02-24 19:11:01 +0300
committerIlnaz Nizametdinov <i.nizametdinov@gmail.com>2022-02-24 19:11:01 +0300
commitb3cb262c728243a07046e49f1715b055e8bde804 (patch)
tree561e68f724193f10a2f84a803be79e91591b5a11
parentf3629d0764f1951c227f697c01f92239049902f0 (diff)
downloadydb-b3cb262c728243a07046e49f1715b055e8bde804.tar.gz
Persist schema version KIKIMR-14198
ref:83395efca7ef8489f23048503f32f686cdb78262
-rw-r--r--ydb/core/tx/datashard/change_collector_async_index.cpp8
-rw-r--r--ydb/core/tx/datashard/change_collector_async_index.h2
-rw-r--r--ydb/core/tx/datashard/change_collector_base.cpp11
-rw-r--r--ydb/core/tx/datashard/change_collector_base.h2
-rw-r--r--ydb/core/tx/datashard/change_collector_cdc_stream.cpp18
-rw-r--r--ydb/core/tx/datashard/change_collector_cdc_stream.h4
-rw-r--r--ydb/core/tx/datashard/change_record.cpp5
-rw-r--r--ydb/core/tx/datashard/change_record.h3
-rw-r--r--ydb/core/tx/datashard/datashard.cpp3
-rw-r--r--ydb/core/tx/datashard/datashard_change_sending.cpp1
-rw-r--r--ydb/core/tx/datashard/datashard_impl.h3
-rw-r--r--ydb/core/tx/datashard/datashard_ut_change_collector.cpp11
-rw-r--r--ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_datashard_/flat_datashard.schema20
13 files changed, 61 insertions, 30 deletions
diff --git a/ydb/core/tx/datashard/change_collector_async_index.cpp b/ydb/core/tx/datashard/change_collector_async_index.cpp
index 6d6c068b11..992014280e 100644
--- a/ydb/core/tx/datashard/change_collector_async_index.cpp
+++ b/ydb/core/tx/datashard/change_collector_async_index.cpp
@@ -135,7 +135,7 @@ bool TAsyncIndexChangeCollector::Collect(const TTableId& tableId, ERowOp rop,
}
if (needDeletion) {
- Persist(pathId, ERowOp::Erase, IndexKeyVals, IndexKeyTags, {});
+ Persist(tableId, pathId, ERowOp::Erase, IndexKeyVals, IndexKeyTags, {});
}
Clear();
@@ -178,7 +178,7 @@ bool TAsyncIndexChangeCollector::Collect(const TTableId& tableId, ERowOp rop,
}
if (needUpdate) {
- Persist(pathId, ERowOp::Upsert, IndexKeyVals, IndexKeyTags, IndexDataVals);
+ Persist(tableId, pathId, ERowOp::Upsert, IndexKeyVals, IndexKeyTags, IndexDataVals);
}
Clear();
@@ -283,12 +283,12 @@ void TAsyncIndexChangeCollector::FillDataWithNull(TTag tag, NScheme::TTypeId typ
IndexDataVals.emplace_back(tag, ECellOp::Set, TRawTypeValue({}, type));
}
-void TAsyncIndexChangeCollector::Persist(const TPathId& pathId, ERowOp rop,
+void TAsyncIndexChangeCollector::Persist(const TTableId& tableId, const TPathId& pathId, ERowOp rop,
TArrayRef<const TRawTypeValue> key, TArrayRef<const TTag> keyTags, TArrayRef<const TUpdateOp> updates)
{
NKikimrChangeExchange::TChangeRecord::TDataChange body;
Serialize(body, rop, key, keyTags, updates);
- TBaseChangeCollector::Persist(TChangeRecord::EKind::AsyncIndex, pathId, body);
+ TBaseChangeCollector::Persist(tableId, pathId, TChangeRecord::EKind::AsyncIndex, body);
}
void TAsyncIndexChangeCollector::Clear() {
diff --git a/ydb/core/tx/datashard/change_collector_async_index.h b/ydb/core/tx/datashard/change_collector_async_index.h
index d908dc7ef2..9160f6d899 100644
--- a/ydb/core/tx/datashard/change_collector_async_index.h
+++ b/ydb/core/tx/datashard/change_collector_async_index.h
@@ -42,7 +42,7 @@ class TAsyncIndexChangeCollector: public TBaseChangeCollector {
void FillDataFromUpdate(NTable::TTag tag, NTable::TPos pos, TArrayRef<const NTable::TUpdateOp> updates);
void FillDataWithNull(NTable::TTag tag, NScheme::TTypeId type);
- void Persist(const TPathId& pathId, NTable::ERowOp rop,
+ void Persist(const TTableId& tableId, const TPathId& pathId, NTable::ERowOp rop,
TArrayRef<const TRawTypeValue> key, TArrayRef<const NTable::TTag> keyTags,
TArrayRef<const NTable::TUpdateOp> updates);
diff --git a/ydb/core/tx/datashard/change_collector_base.cpp b/ydb/core/tx/datashard/change_collector_base.cpp
index 1df9173f1f..779d0ce443 100644
--- a/ydb/core/tx/datashard/change_collector_base.cpp
+++ b/ydb/core/tx/datashard/change_collector_base.cpp
@@ -122,19 +122,28 @@ void TBaseChangeCollector::Serialize(TDataChange& out, ERowOp rop,
}
}
-void TBaseChangeCollector::Persist(TChangeRecord::EKind kind, const TPathId& pathId, const TDataChange& body) {
+void TBaseChangeCollector::Persist(
+ const TTableId& tableId, // origin table
+ const TPathId& pathId, // target object (table, stream, etc...)
+ TChangeRecord::EKind kind, const TDataChange& body)
+{
NIceDb::TNiceDb db(Db);
if (!Group) {
Group = Self->AllocateChangeRecordGroup(db);
}
+ Y_VERIFY_S(Self->IsUserTable(tableId), "Unknown table: " << tableId);
+ auto userTable = Self->GetUserTables().at(tableId.PathId.LocalPathId);
+ Y_VERIFY(userTable->GetTableSchemaVersion());
+
auto record = TChangeRecordBuilder(kind)
.WithOrder(Self->AllocateChangeRecordOrder(db))
.WithGroup(*Group)
.WithStep(WriteVersion.Step)
.WithTxId(WriteVersion.TxId)
.WithPathId(pathId)
+ .WithSchemaVersion(userTable->GetTableSchemaVersion())
.WithBody(body.SerializeAsString())
.Build();
diff --git a/ydb/core/tx/datashard/change_collector_base.h b/ydb/core/tx/datashard/change_collector_base.h
index bb04deb29c..1e582ac74e 100644
--- a/ydb/core/tx/datashard/change_collector_base.h
+++ b/ydb/core/tx/datashard/change_collector_base.h
@@ -28,7 +28,7 @@ protected:
TArrayRef<const TRawTypeValue> key, TArrayRef<const NTable::TTag> keyTags,
const NTable::TRowState* oldState, const NTable::TRowState* newState, TArrayRef<const NTable::TTag> valueTags);
- void Persist(TChangeRecord::EKind kind, const TPathId& pathId, const TDataChange& body);
+ void Persist(const TTableId& tableId, const TPathId& pathId, TChangeRecord::EKind kind, const TDataChange& body);
public:
explicit TBaseChangeCollector(TDataShard* self, NTable::TDatabase& db, bool isImmediateTx);
diff --git a/ydb/core/tx/datashard/change_collector_cdc_stream.cpp b/ydb/core/tx/datashard/change_collector_cdc_stream.cpp
index f5cb67d1ba..4fc67adc6e 100644
--- a/ydb/core/tx/datashard/change_collector_cdc_stream.cpp
+++ b/ydb/core/tx/datashard/change_collector_cdc_stream.cpp
@@ -115,10 +115,10 @@ bool TCdcStreamChangeCollector::Collect(const TTableId& tableId, ERowOp rop,
for (const auto& [pathId, stream] : userTable->CdcStreams) {
switch (stream.Mode) {
case NKikimrSchemeOp::ECdcStreamModeKeysOnly:
- Persist(pathId, rop, key, keyTags, {});
+ Persist(tableId, pathId, rop, key, keyTags, {});
break;
case NKikimrSchemeOp::ECdcStreamModeUpdate:
- Persist(pathId, rop, key, keyTags, updates);
+ Persist(tableId, pathId, rop, key, keyTags, updates);
break;
case NKikimrSchemeOp::ECdcStreamModeNewImage:
case NKikimrSchemeOp::ECdcStreamModeOldImage:
@@ -131,14 +131,14 @@ bool TCdcStreamChangeCollector::Collect(const TTableId& tableId, ERowOp rop,
}
if (stream.Mode == NKikimrSchemeOp::ECdcStreamModeOldImage) {
- Persist(pathId, rop, key, keyTags, NullIfErased(&*oldState), nullptr, valueTags);
+ Persist(tableId, pathId, rop, key, keyTags, NullIfErased(&*oldState), nullptr, valueTags);
} else {
const auto newState = PatchState(*oldState, rop, MakeTagToPos(valueTags), MappedUpdates(updates));
if (stream.Mode == NKikimrSchemeOp::ECdcStreamModeNewImage) {
- Persist(pathId, rop, key, keyTags, nullptr, NullIfErased(&newState), valueTags);
+ Persist(tableId, pathId, rop, key, keyTags, nullptr, NullIfErased(&newState), valueTags);
} else {
- Persist(pathId, rop, key, keyTags, NullIfErased(&*oldState), NullIfErased(&newState), valueTags);
+ Persist(tableId, pathId, rop, key, keyTags, NullIfErased(&*oldState), NullIfErased(&newState), valueTags);
}
}
@@ -202,15 +202,15 @@ TRowState TCdcStreamChangeCollector::PatchState(const TRowState& oldState, ERowO
return newState;
}
-void TCdcStreamChangeCollector::Persist(const TPathId& pathId, ERowOp rop,
+void TCdcStreamChangeCollector::Persist(const TTableId& tableId, const TPathId& pathId, ERowOp rop,
TArrayRef<const TRawTypeValue> key, TArrayRef<const TTag> keyTags, TArrayRef<const TUpdateOp> updates)
{
NKikimrChangeExchange::TChangeRecord::TDataChange body;
Serialize(body, rop, key, keyTags, updates);
- TBaseChangeCollector::Persist(TChangeRecord::EKind::CdcDataChange, pathId, body);
+ TBaseChangeCollector::Persist(tableId, pathId, TChangeRecord::EKind::CdcDataChange, body);
}
-void TCdcStreamChangeCollector::Persist(const TPathId& pathId, ERowOp rop,
+void TCdcStreamChangeCollector::Persist(const TTableId& tableId, const TPathId& pathId, ERowOp rop,
TArrayRef<const TRawTypeValue> key, TArrayRef<const TTag> keyTags,
const TRowState* oldState, const TRowState* newState, TArrayRef<const TTag> valueTags)
{
@@ -220,7 +220,7 @@ void TCdcStreamChangeCollector::Persist(const TPathId& pathId, ERowOp rop,
NKikimrChangeExchange::TChangeRecord::TDataChange body;
Serialize(body, rop, key, keyTags, oldState, newState, valueTags);
- TBaseChangeCollector::Persist(TChangeRecord::EKind::CdcDataChange, pathId, body);
+ TBaseChangeCollector::Persist(tableId, pathId, TChangeRecord::EKind::CdcDataChange, body);
}
} // NDataShard
diff --git a/ydb/core/tx/datashard/change_collector_cdc_stream.h b/ydb/core/tx/datashard/change_collector_cdc_stream.h
index e02382d5db..631a91c24e 100644
--- a/ydb/core/tx/datashard/change_collector_cdc_stream.h
+++ b/ydb/core/tx/datashard/change_collector_cdc_stream.h
@@ -13,9 +13,9 @@ class TCdcStreamChangeCollector: public TBaseChangeCollector {
static NTable::TRowState PatchState(const NTable::TRowState& oldState, NTable::ERowOp rop,
const THashMap<NTable::TTag, NTable::TPos>& tagToPos, const THashMap<NTable::TTag, NTable::TUpdateOp>& updates);
- void Persist(const TPathId& pathId, NTable::ERowOp rop,
+ void Persist(const TTableId& tableId, const TPathId& pathId, NTable::ERowOp rop,
TArrayRef<const TRawTypeValue> key, TArrayRef<const NTable::TTag> keyTags, TArrayRef<const NTable::TUpdateOp> updates);
- void Persist(const TPathId& pathId, NTable::ERowOp rop,
+ void Persist(const TTableId& tableId, const TPathId& pathId, NTable::ERowOp rop,
TArrayRef<const TRawTypeValue> key, TArrayRef<const NTable::TTag> keyTags,
const NTable::TRowState* oldState, const NTable::TRowState* newState, TArrayRef<const NTable::TTag> valueTags);
diff --git a/ydb/core/tx/datashard/change_record.cpp b/ydb/core/tx/datashard/change_record.cpp
index 661df31fb0..f191d3c5b0 100644
--- a/ydb/core/tx/datashard/change_record.cpp
+++ b/ydb/core/tx/datashard/change_record.cpp
@@ -103,6 +103,11 @@ TChangeRecordBuilder& TChangeRecordBuilder::WithPathId(const TPathId& pathId) {
return *this;
}
+TChangeRecordBuilder& TChangeRecordBuilder::WithSchemaVersion(ui64 version) {
+ Record.SchemaVersion = version;
+ return *this;
+}
+
TChangeRecordBuilder& TChangeRecordBuilder::WithBody(const TString& body) {
Record.Body = body;
return *this;
diff --git a/ydb/core/tx/datashard/change_record.h b/ydb/core/tx/datashard/change_record.h
index 7c88bb8b66..96b0ceb4f4 100644
--- a/ydb/core/tx/datashard/change_record.h
+++ b/ydb/core/tx/datashard/change_record.h
@@ -30,6 +30,7 @@ public:
ui64 GetStep() const { return Step; }
ui64 GetTxId() const { return TxId; }
const TPathId& GetPathId() const { return PathId; }
+ ui64 GetSchemaVersion() const { return SchemaVersion; }
EKind GetKind() const { return Kind; }
const TString& GetBody() const { return Body; }
i64 GetSeqNo() const;
@@ -46,6 +47,7 @@ private:
ui64 Step;
ui64 TxId;
TPathId PathId;
+ ui64 SchemaVersion;
EKind Kind;
TString Body;
@@ -64,6 +66,7 @@ public:
TChangeRecordBuilder& WithStep(ui64 step);
TChangeRecordBuilder& WithTxId(ui64 txId);
TChangeRecordBuilder& WithPathId(const TPathId& pathId);
+ TChangeRecordBuilder& WithSchemaVersion(ui64 version);
TChangeRecordBuilder& WithBody(const TString& body);
TChangeRecordBuilder& WithBody(TString&& body);
diff --git a/ydb/core/tx/datashard/datashard.cpp b/ydb/core/tx/datashard/datashard.cpp
index d86568b4a1..e636d9c914 100644
--- a/ydb/core/tx/datashard/datashard.cpp
+++ b/ydb/core/tx/datashard/datashard.cpp
@@ -467,7 +467,8 @@ void TDataShard::PersistChangeRecord(NIceDb::TNiceDb& db, const TChangeRecord& r
NIceDb::TUpdate<Schema::ChangeRecords::TxId>(record.GetTxId()),
NIceDb::TUpdate<Schema::ChangeRecords::PathOwnerId>(record.GetPathId().OwnerId),
NIceDb::TUpdate<Schema::ChangeRecords::LocalPathId>(record.GetPathId().LocalPathId),
- NIceDb::TUpdate<Schema::ChangeRecords::BodySize>(record.GetBody().size()));
+ NIceDb::TUpdate<Schema::ChangeRecords::BodySize>(record.GetBody().size()),
+ NIceDb::TUpdate<Schema::ChangeRecords::SchemaVersion>(record.GetSchemaVersion()));
db.Table<Schema::ChangeRecordDetails>().Key(record.GetOrder()).Update(
NIceDb::TUpdate<Schema::ChangeRecordDetails::Kind>(record.GetKind()),
NIceDb::TUpdate<Schema::ChangeRecordDetails::Body>(record.GetBody()));
diff --git a/ydb/core/tx/datashard/datashard_change_sending.cpp b/ydb/core/tx/datashard/datashard_change_sending.cpp
index 7789c5137e..0f51e31641 100644
--- a/ydb/core/tx/datashard/datashard_change_sending.cpp
+++ b/ydb/core/tx/datashard/datashard_change_sending.cpp
@@ -72,6 +72,7 @@ class TDataShard::TTxRequestChangeRecords: public TTransactionBase<TDataShard> {
basic.GetValue<Schema::ChangeRecords::PathOwnerId>(),
basic.GetValue<Schema::ChangeRecords::LocalPathId>()
))
+ .WithSchemaVersion(basic.GetValue<Schema::ChangeRecords::SchemaVersion>())
.WithBody(details.GetValue<Schema::ChangeRecordDetails::Body>())
.Build());
diff --git a/ydb/core/tx/datashard/datashard_impl.h b/ydb/core/tx/datashard/datashard_impl.h
index 11313474f8..3056764635 100644
--- a/ydb/core/tx/datashard/datashard_impl.h
+++ b/ydb/core/tx/datashard/datashard_impl.h
@@ -652,9 +652,10 @@ class TDataShard
struct PathOwnerId : Column<5, NScheme::NTypeIds::Uint64> {};
struct LocalPathId : Column<6, NScheme::NTypeIds::Uint64> {};
struct BodySize : Column<7, NScheme::NTypeIds::Uint64> {};
+ struct SchemaVersion : Column<8, NScheme::NTypeIds::Uint64> {};
using TKey = TableKey<Order>;
- using TColumns = TableColumns<Order, Group, PlanStep, TxId, PathOwnerId, LocalPathId, BodySize>;
+ using TColumns = TableColumns<Order, Group, PlanStep, TxId, PathOwnerId, LocalPathId, BodySize, SchemaVersion>;
};
struct ChangeRecordDetails : Table<18> {
diff --git a/ydb/core/tx/datashard/datashard_ut_change_collector.cpp b/ydb/core/tx/datashard/datashard_ut_change_collector.cpp
index fc406f6f42..d94fff333b 100644
--- a/ydb/core/tx/datashard/datashard_ut_change_collector.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_change_collector.cpp
@@ -28,14 +28,14 @@ auto GetValueFromLocalDb(TTestActorRuntime& runtime, const TActorId& sender, ui6
auto GetChangeRecords(TTestActorRuntime& runtime, const TActorId& sender, ui64 tabletId) {
auto protoValue = GetValueFromLocalDb(runtime, sender, tabletId, R"((
(let range '( '('Order (Uint64 '0) (Void) )))
- (let columns '('Order 'Group 'PlanStep 'TxId 'PathOwnerId 'LocalPathId) )
+ (let columns '('Order 'Group 'PlanStep 'TxId 'PathOwnerId 'LocalPathId 'SchemaVersion) )
(let result (SelectRange 'ChangeRecords range columns '()))
(return (AsList (SetResult 'Result result) ))
))");
auto value = NClient::TValue::Create(protoValue);
const auto& result = value["Result"]["List"];
- TVector<std::tuple<ui64, ui64, ui64, ui64, TPathId>> records;
+ TVector<std::tuple<ui64, ui64, ui64, ui64, TPathId, ui64>> records;
for (size_t i = 0; i < result.Size(); ++i) {
const auto& item = result[i];
records.emplace_back(
@@ -43,7 +43,8 @@ auto GetChangeRecords(TTestActorRuntime& runtime, const TActorId& sender, ui64 t
item["Group"],
item["PlanStep"],
item["TxId"],
- TPathId(item["PathOwnerId"], item["LocalPathId"])
+ TPathId(item["PathOwnerId"], item["LocalPathId"]),
+ item["SchemaVersion"]
);
}
@@ -96,6 +97,8 @@ auto GetChangeRecordsWithDetails(TTestActorRuntime& runtime, const TActorId& sen
.WithGroup(std::get<1>(record))
.WithStep(std::get<2>(record))
.WithTxId(std::get<3>(record))
+ .WithPathId(std::get<4>(record))
+ .WithSchemaVersion(std::get<5>(record))
.WithBody(std::get<2>(detail))
.Build()
);
@@ -333,6 +336,7 @@ Y_UNIT_TEST_SUITE(AsyncIndexChangeCollector) {
UNIT_ASSERT_VALUES_EQUAL(expected.size(), actual.size());
for (size_t i = 0; i < expected.size(); ++i) {
UNIT_ASSERT_VALUES_EQUAL(expected.at(i), TStructRecord::Parse(actual.at(i).GetBody(), tagToName));
+ UNIT_ASSERT_VALUES_EQUAL(actual.at(i).GetSchemaVersion(), entry.TableId.SchemaVersion);
}
}
}
@@ -633,6 +637,7 @@ Y_UNIT_TEST_SUITE(CdcStreamChangeCollector) {
UNIT_ASSERT_VALUES_EQUAL(expected.size(), actual.size());
for (size_t i = 0; i < expected.size(); ++i) {
UNIT_ASSERT_VALUES_EQUAL(expected.at(i), TStructRecord::Parse(actual.at(i).GetBody(), tagToName));
+ UNIT_ASSERT_VALUES_EQUAL(actual.at(i).GetSchemaVersion(), entry.TableId.SchemaVersion);
}
}
}
diff --git a/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_datashard_/flat_datashard.schema b/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_datashard_/flat_datashard.schema
index 10d5f2ef24..90bf68adbe 100644
--- a/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_datashard_/flat_datashard.schema
+++ b/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_datashard_/flat_datashard.schema
@@ -972,11 +972,6 @@
],
"ColumnsAdded": [
{
- "ColumnId": 7,
- "ColumnName": "BodySize",
- "ColumnType": "Uint64"
- },
- {
"ColumnId": 1,
"ColumnName": "Order",
"ColumnType": "Uint64"
@@ -1005,19 +1000,30 @@
"ColumnId": 6,
"ColumnName": "LocalPathId",
"ColumnType": "Uint64"
+ },
+ {
+ "ColumnId": 7,
+ "ColumnName": "BodySize",
+ "ColumnType": "Uint64"
+ },
+ {
+ "ColumnId": 8,
+ "ColumnName": "SchemaVersion",
+ "ColumnType": "Uint64"
}
],
"ColumnsDropped": [],
"ColumnFamilies": {
"0": {
"Columns": [
- 7,
1,
2,
3,
4,
5,
- 6
+ 6,
+ 7,
+ 8
],
"RoomID": 0,
"Codec": 0,