diff options
author | Ilnaz Nizametdinov <i.nizametdinov@gmail.com> | 2022-02-24 19:11:01 +0300 |
---|---|---|
committer | Ilnaz Nizametdinov <i.nizametdinov@gmail.com> | 2022-02-24 19:11:01 +0300 |
commit | b3cb262c728243a07046e49f1715b055e8bde804 (patch) | |
tree | 561e68f724193f10a2f84a803be79e91591b5a11 | |
parent | f3629d0764f1951c227f697c01f92239049902f0 (diff) | |
download | ydb-b3cb262c728243a07046e49f1715b055e8bde804.tar.gz |
Persist schema version KIKIMR-14198
ref:83395efca7ef8489f23048503f32f686cdb78262
13 files changed, 61 insertions, 30 deletions
diff --git a/ydb/core/tx/datashard/change_collector_async_index.cpp b/ydb/core/tx/datashard/change_collector_async_index.cpp index 6d6c068b11..992014280e 100644 --- a/ydb/core/tx/datashard/change_collector_async_index.cpp +++ b/ydb/core/tx/datashard/change_collector_async_index.cpp @@ -135,7 +135,7 @@ bool TAsyncIndexChangeCollector::Collect(const TTableId& tableId, ERowOp rop, } if (needDeletion) { - Persist(pathId, ERowOp::Erase, IndexKeyVals, IndexKeyTags, {}); + Persist(tableId, pathId, ERowOp::Erase, IndexKeyVals, IndexKeyTags, {}); } Clear(); @@ -178,7 +178,7 @@ bool TAsyncIndexChangeCollector::Collect(const TTableId& tableId, ERowOp rop, } if (needUpdate) { - Persist(pathId, ERowOp::Upsert, IndexKeyVals, IndexKeyTags, IndexDataVals); + Persist(tableId, pathId, ERowOp::Upsert, IndexKeyVals, IndexKeyTags, IndexDataVals); } Clear(); @@ -283,12 +283,12 @@ void TAsyncIndexChangeCollector::FillDataWithNull(TTag tag, NScheme::TTypeId typ IndexDataVals.emplace_back(tag, ECellOp::Set, TRawTypeValue({}, type)); } -void TAsyncIndexChangeCollector::Persist(const TPathId& pathId, ERowOp rop, +void TAsyncIndexChangeCollector::Persist(const TTableId& tableId, const TPathId& pathId, ERowOp rop, TArrayRef<const TRawTypeValue> key, TArrayRef<const TTag> keyTags, TArrayRef<const TUpdateOp> updates) { NKikimrChangeExchange::TChangeRecord::TDataChange body; Serialize(body, rop, key, keyTags, updates); - TBaseChangeCollector::Persist(TChangeRecord::EKind::AsyncIndex, pathId, body); + TBaseChangeCollector::Persist(tableId, pathId, TChangeRecord::EKind::AsyncIndex, body); } void TAsyncIndexChangeCollector::Clear() { diff --git a/ydb/core/tx/datashard/change_collector_async_index.h b/ydb/core/tx/datashard/change_collector_async_index.h index d908dc7ef2..9160f6d899 100644 --- a/ydb/core/tx/datashard/change_collector_async_index.h +++ b/ydb/core/tx/datashard/change_collector_async_index.h @@ -42,7 +42,7 @@ class TAsyncIndexChangeCollector: public TBaseChangeCollector { void FillDataFromUpdate(NTable::TTag tag, NTable::TPos pos, TArrayRef<const NTable::TUpdateOp> updates); void FillDataWithNull(NTable::TTag tag, NScheme::TTypeId type); - void Persist(const TPathId& pathId, NTable::ERowOp rop, + void Persist(const TTableId& tableId, const TPathId& pathId, NTable::ERowOp rop, TArrayRef<const TRawTypeValue> key, TArrayRef<const NTable::TTag> keyTags, TArrayRef<const NTable::TUpdateOp> updates); diff --git a/ydb/core/tx/datashard/change_collector_base.cpp b/ydb/core/tx/datashard/change_collector_base.cpp index 1df9173f1f..779d0ce443 100644 --- a/ydb/core/tx/datashard/change_collector_base.cpp +++ b/ydb/core/tx/datashard/change_collector_base.cpp @@ -122,19 +122,28 @@ void TBaseChangeCollector::Serialize(TDataChange& out, ERowOp rop, } } -void TBaseChangeCollector::Persist(TChangeRecord::EKind kind, const TPathId& pathId, const TDataChange& body) { +void TBaseChangeCollector::Persist( + const TTableId& tableId, // origin table + const TPathId& pathId, // target object (table, stream, etc...) + TChangeRecord::EKind kind, const TDataChange& body) +{ NIceDb::TNiceDb db(Db); if (!Group) { Group = Self->AllocateChangeRecordGroup(db); } + Y_VERIFY_S(Self->IsUserTable(tableId), "Unknown table: " << tableId); + auto userTable = Self->GetUserTables().at(tableId.PathId.LocalPathId); + Y_VERIFY(userTable->GetTableSchemaVersion()); + auto record = TChangeRecordBuilder(kind) .WithOrder(Self->AllocateChangeRecordOrder(db)) .WithGroup(*Group) .WithStep(WriteVersion.Step) .WithTxId(WriteVersion.TxId) .WithPathId(pathId) + .WithSchemaVersion(userTable->GetTableSchemaVersion()) .WithBody(body.SerializeAsString()) .Build(); diff --git a/ydb/core/tx/datashard/change_collector_base.h b/ydb/core/tx/datashard/change_collector_base.h index bb04deb29c..1e582ac74e 100644 --- a/ydb/core/tx/datashard/change_collector_base.h +++ b/ydb/core/tx/datashard/change_collector_base.h @@ -28,7 +28,7 @@ protected: TArrayRef<const TRawTypeValue> key, TArrayRef<const NTable::TTag> keyTags, const NTable::TRowState* oldState, const NTable::TRowState* newState, TArrayRef<const NTable::TTag> valueTags); - void Persist(TChangeRecord::EKind kind, const TPathId& pathId, const TDataChange& body); + void Persist(const TTableId& tableId, const TPathId& pathId, TChangeRecord::EKind kind, const TDataChange& body); public: explicit TBaseChangeCollector(TDataShard* self, NTable::TDatabase& db, bool isImmediateTx); diff --git a/ydb/core/tx/datashard/change_collector_cdc_stream.cpp b/ydb/core/tx/datashard/change_collector_cdc_stream.cpp index f5cb67d1ba..4fc67adc6e 100644 --- a/ydb/core/tx/datashard/change_collector_cdc_stream.cpp +++ b/ydb/core/tx/datashard/change_collector_cdc_stream.cpp @@ -115,10 +115,10 @@ bool TCdcStreamChangeCollector::Collect(const TTableId& tableId, ERowOp rop, for (const auto& [pathId, stream] : userTable->CdcStreams) { switch (stream.Mode) { case NKikimrSchemeOp::ECdcStreamModeKeysOnly: - Persist(pathId, rop, key, keyTags, {}); + Persist(tableId, pathId, rop, key, keyTags, {}); break; case NKikimrSchemeOp::ECdcStreamModeUpdate: - Persist(pathId, rop, key, keyTags, updates); + Persist(tableId, pathId, rop, key, keyTags, updates); break; case NKikimrSchemeOp::ECdcStreamModeNewImage: case NKikimrSchemeOp::ECdcStreamModeOldImage: @@ -131,14 +131,14 @@ bool TCdcStreamChangeCollector::Collect(const TTableId& tableId, ERowOp rop, } if (stream.Mode == NKikimrSchemeOp::ECdcStreamModeOldImage) { - Persist(pathId, rop, key, keyTags, NullIfErased(&*oldState), nullptr, valueTags); + Persist(tableId, pathId, rop, key, keyTags, NullIfErased(&*oldState), nullptr, valueTags); } else { const auto newState = PatchState(*oldState, rop, MakeTagToPos(valueTags), MappedUpdates(updates)); if (stream.Mode == NKikimrSchemeOp::ECdcStreamModeNewImage) { - Persist(pathId, rop, key, keyTags, nullptr, NullIfErased(&newState), valueTags); + Persist(tableId, pathId, rop, key, keyTags, nullptr, NullIfErased(&newState), valueTags); } else { - Persist(pathId, rop, key, keyTags, NullIfErased(&*oldState), NullIfErased(&newState), valueTags); + Persist(tableId, pathId, rop, key, keyTags, NullIfErased(&*oldState), NullIfErased(&newState), valueTags); } } @@ -202,15 +202,15 @@ TRowState TCdcStreamChangeCollector::PatchState(const TRowState& oldState, ERowO return newState; } -void TCdcStreamChangeCollector::Persist(const TPathId& pathId, ERowOp rop, +void TCdcStreamChangeCollector::Persist(const TTableId& tableId, const TPathId& pathId, ERowOp rop, TArrayRef<const TRawTypeValue> key, TArrayRef<const TTag> keyTags, TArrayRef<const TUpdateOp> updates) { NKikimrChangeExchange::TChangeRecord::TDataChange body; Serialize(body, rop, key, keyTags, updates); - TBaseChangeCollector::Persist(TChangeRecord::EKind::CdcDataChange, pathId, body); + TBaseChangeCollector::Persist(tableId, pathId, TChangeRecord::EKind::CdcDataChange, body); } -void TCdcStreamChangeCollector::Persist(const TPathId& pathId, ERowOp rop, +void TCdcStreamChangeCollector::Persist(const TTableId& tableId, const TPathId& pathId, ERowOp rop, TArrayRef<const TRawTypeValue> key, TArrayRef<const TTag> keyTags, const TRowState* oldState, const TRowState* newState, TArrayRef<const TTag> valueTags) { @@ -220,7 +220,7 @@ void TCdcStreamChangeCollector::Persist(const TPathId& pathId, ERowOp rop, NKikimrChangeExchange::TChangeRecord::TDataChange body; Serialize(body, rop, key, keyTags, oldState, newState, valueTags); - TBaseChangeCollector::Persist(TChangeRecord::EKind::CdcDataChange, pathId, body); + TBaseChangeCollector::Persist(tableId, pathId, TChangeRecord::EKind::CdcDataChange, body); } } // NDataShard diff --git a/ydb/core/tx/datashard/change_collector_cdc_stream.h b/ydb/core/tx/datashard/change_collector_cdc_stream.h index e02382d5db..631a91c24e 100644 --- a/ydb/core/tx/datashard/change_collector_cdc_stream.h +++ b/ydb/core/tx/datashard/change_collector_cdc_stream.h @@ -13,9 +13,9 @@ class TCdcStreamChangeCollector: public TBaseChangeCollector { static NTable::TRowState PatchState(const NTable::TRowState& oldState, NTable::ERowOp rop, const THashMap<NTable::TTag, NTable::TPos>& tagToPos, const THashMap<NTable::TTag, NTable::TUpdateOp>& updates); - void Persist(const TPathId& pathId, NTable::ERowOp rop, + void Persist(const TTableId& tableId, const TPathId& pathId, NTable::ERowOp rop, TArrayRef<const TRawTypeValue> key, TArrayRef<const NTable::TTag> keyTags, TArrayRef<const NTable::TUpdateOp> updates); - void Persist(const TPathId& pathId, NTable::ERowOp rop, + void Persist(const TTableId& tableId, const TPathId& pathId, NTable::ERowOp rop, TArrayRef<const TRawTypeValue> key, TArrayRef<const NTable::TTag> keyTags, const NTable::TRowState* oldState, const NTable::TRowState* newState, TArrayRef<const NTable::TTag> valueTags); diff --git a/ydb/core/tx/datashard/change_record.cpp b/ydb/core/tx/datashard/change_record.cpp index 661df31fb0..f191d3c5b0 100644 --- a/ydb/core/tx/datashard/change_record.cpp +++ b/ydb/core/tx/datashard/change_record.cpp @@ -103,6 +103,11 @@ TChangeRecordBuilder& TChangeRecordBuilder::WithPathId(const TPathId& pathId) { return *this; } +TChangeRecordBuilder& TChangeRecordBuilder::WithSchemaVersion(ui64 version) { + Record.SchemaVersion = version; + return *this; +} + TChangeRecordBuilder& TChangeRecordBuilder::WithBody(const TString& body) { Record.Body = body; return *this; diff --git a/ydb/core/tx/datashard/change_record.h b/ydb/core/tx/datashard/change_record.h index 7c88bb8b66..96b0ceb4f4 100644 --- a/ydb/core/tx/datashard/change_record.h +++ b/ydb/core/tx/datashard/change_record.h @@ -30,6 +30,7 @@ public: ui64 GetStep() const { return Step; } ui64 GetTxId() const { return TxId; } const TPathId& GetPathId() const { return PathId; } + ui64 GetSchemaVersion() const { return SchemaVersion; } EKind GetKind() const { return Kind; } const TString& GetBody() const { return Body; } i64 GetSeqNo() const; @@ -46,6 +47,7 @@ private: ui64 Step; ui64 TxId; TPathId PathId; + ui64 SchemaVersion; EKind Kind; TString Body; @@ -64,6 +66,7 @@ public: TChangeRecordBuilder& WithStep(ui64 step); TChangeRecordBuilder& WithTxId(ui64 txId); TChangeRecordBuilder& WithPathId(const TPathId& pathId); + TChangeRecordBuilder& WithSchemaVersion(ui64 version); TChangeRecordBuilder& WithBody(const TString& body); TChangeRecordBuilder& WithBody(TString&& body); diff --git a/ydb/core/tx/datashard/datashard.cpp b/ydb/core/tx/datashard/datashard.cpp index d86568b4a1..e636d9c914 100644 --- a/ydb/core/tx/datashard/datashard.cpp +++ b/ydb/core/tx/datashard/datashard.cpp @@ -467,7 +467,8 @@ void TDataShard::PersistChangeRecord(NIceDb::TNiceDb& db, const TChangeRecord& r NIceDb::TUpdate<Schema::ChangeRecords::TxId>(record.GetTxId()), NIceDb::TUpdate<Schema::ChangeRecords::PathOwnerId>(record.GetPathId().OwnerId), NIceDb::TUpdate<Schema::ChangeRecords::LocalPathId>(record.GetPathId().LocalPathId), - NIceDb::TUpdate<Schema::ChangeRecords::BodySize>(record.GetBody().size())); + NIceDb::TUpdate<Schema::ChangeRecords::BodySize>(record.GetBody().size()), + NIceDb::TUpdate<Schema::ChangeRecords::SchemaVersion>(record.GetSchemaVersion())); db.Table<Schema::ChangeRecordDetails>().Key(record.GetOrder()).Update( NIceDb::TUpdate<Schema::ChangeRecordDetails::Kind>(record.GetKind()), NIceDb::TUpdate<Schema::ChangeRecordDetails::Body>(record.GetBody())); diff --git a/ydb/core/tx/datashard/datashard_change_sending.cpp b/ydb/core/tx/datashard/datashard_change_sending.cpp index 7789c5137e..0f51e31641 100644 --- a/ydb/core/tx/datashard/datashard_change_sending.cpp +++ b/ydb/core/tx/datashard/datashard_change_sending.cpp @@ -72,6 +72,7 @@ class TDataShard::TTxRequestChangeRecords: public TTransactionBase<TDataShard> { basic.GetValue<Schema::ChangeRecords::PathOwnerId>(), basic.GetValue<Schema::ChangeRecords::LocalPathId>() )) + .WithSchemaVersion(basic.GetValue<Schema::ChangeRecords::SchemaVersion>()) .WithBody(details.GetValue<Schema::ChangeRecordDetails::Body>()) .Build()); diff --git a/ydb/core/tx/datashard/datashard_impl.h b/ydb/core/tx/datashard/datashard_impl.h index 11313474f8..3056764635 100644 --- a/ydb/core/tx/datashard/datashard_impl.h +++ b/ydb/core/tx/datashard/datashard_impl.h @@ -652,9 +652,10 @@ class TDataShard struct PathOwnerId : Column<5, NScheme::NTypeIds::Uint64> {}; struct LocalPathId : Column<6, NScheme::NTypeIds::Uint64> {}; struct BodySize : Column<7, NScheme::NTypeIds::Uint64> {}; + struct SchemaVersion : Column<8, NScheme::NTypeIds::Uint64> {}; using TKey = TableKey<Order>; - using TColumns = TableColumns<Order, Group, PlanStep, TxId, PathOwnerId, LocalPathId, BodySize>; + using TColumns = TableColumns<Order, Group, PlanStep, TxId, PathOwnerId, LocalPathId, BodySize, SchemaVersion>; }; struct ChangeRecordDetails : Table<18> { diff --git a/ydb/core/tx/datashard/datashard_ut_change_collector.cpp b/ydb/core/tx/datashard/datashard_ut_change_collector.cpp index fc406f6f42..d94fff333b 100644 --- a/ydb/core/tx/datashard/datashard_ut_change_collector.cpp +++ b/ydb/core/tx/datashard/datashard_ut_change_collector.cpp @@ -28,14 +28,14 @@ auto GetValueFromLocalDb(TTestActorRuntime& runtime, const TActorId& sender, ui6 auto GetChangeRecords(TTestActorRuntime& runtime, const TActorId& sender, ui64 tabletId) { auto protoValue = GetValueFromLocalDb(runtime, sender, tabletId, R"(( (let range '( '('Order (Uint64 '0) (Void) ))) - (let columns '('Order 'Group 'PlanStep 'TxId 'PathOwnerId 'LocalPathId) ) + (let columns '('Order 'Group 'PlanStep 'TxId 'PathOwnerId 'LocalPathId 'SchemaVersion) ) (let result (SelectRange 'ChangeRecords range columns '())) (return (AsList (SetResult 'Result result) )) ))"); auto value = NClient::TValue::Create(protoValue); const auto& result = value["Result"]["List"]; - TVector<std::tuple<ui64, ui64, ui64, ui64, TPathId>> records; + TVector<std::tuple<ui64, ui64, ui64, ui64, TPathId, ui64>> records; for (size_t i = 0; i < result.Size(); ++i) { const auto& item = result[i]; records.emplace_back( @@ -43,7 +43,8 @@ auto GetChangeRecords(TTestActorRuntime& runtime, const TActorId& sender, ui64 t item["Group"], item["PlanStep"], item["TxId"], - TPathId(item["PathOwnerId"], item["LocalPathId"]) + TPathId(item["PathOwnerId"], item["LocalPathId"]), + item["SchemaVersion"] ); } @@ -96,6 +97,8 @@ auto GetChangeRecordsWithDetails(TTestActorRuntime& runtime, const TActorId& sen .WithGroup(std::get<1>(record)) .WithStep(std::get<2>(record)) .WithTxId(std::get<3>(record)) + .WithPathId(std::get<4>(record)) + .WithSchemaVersion(std::get<5>(record)) .WithBody(std::get<2>(detail)) .Build() ); @@ -333,6 +336,7 @@ Y_UNIT_TEST_SUITE(AsyncIndexChangeCollector) { UNIT_ASSERT_VALUES_EQUAL(expected.size(), actual.size()); for (size_t i = 0; i < expected.size(); ++i) { UNIT_ASSERT_VALUES_EQUAL(expected.at(i), TStructRecord::Parse(actual.at(i).GetBody(), tagToName)); + UNIT_ASSERT_VALUES_EQUAL(actual.at(i).GetSchemaVersion(), entry.TableId.SchemaVersion); } } } @@ -633,6 +637,7 @@ Y_UNIT_TEST_SUITE(CdcStreamChangeCollector) { UNIT_ASSERT_VALUES_EQUAL(expected.size(), actual.size()); for (size_t i = 0; i < expected.size(); ++i) { UNIT_ASSERT_VALUES_EQUAL(expected.at(i), TStructRecord::Parse(actual.at(i).GetBody(), tagToName)); + UNIT_ASSERT_VALUES_EQUAL(actual.at(i).GetSchemaVersion(), entry.TableId.SchemaVersion); } } } diff --git a/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_datashard_/flat_datashard.schema b/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_datashard_/flat_datashard.schema index 10d5f2ef24..90bf68adbe 100644 --- a/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_datashard_/flat_datashard.schema +++ b/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_datashard_/flat_datashard.schema @@ -972,11 +972,6 @@ ], "ColumnsAdded": [ { - "ColumnId": 7, - "ColumnName": "BodySize", - "ColumnType": "Uint64" - }, - { "ColumnId": 1, "ColumnName": "Order", "ColumnType": "Uint64" @@ -1005,19 +1000,30 @@ "ColumnId": 6, "ColumnName": "LocalPathId", "ColumnType": "Uint64" + }, + { + "ColumnId": 7, + "ColumnName": "BodySize", + "ColumnType": "Uint64" + }, + { + "ColumnId": 8, + "ColumnName": "SchemaVersion", + "ColumnType": "Uint64" } ], "ColumnsDropped": [], "ColumnFamilies": { "0": { "Columns": [ - 7, 1, 2, 3, 4, 5, - 6 + 6, + 7, + 8 ], "RoomID": 0, "Codec": 0, |