diff options
author | Ilnaz Nizametdinov <ilnaz@ydb.tech> | 2024-11-12 12:09:21 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-11-12 12:09:21 +0300 |
commit | 5f2bf90bebedec6c74fc7f66002b35c585f8060e (patch) | |
tree | d9fbc731afc53ca3e222678a1121d7596bad9035 | |
parent | 75519f11681f8e574b8cc3020c1aebb9727bc60a (diff) | |
download | ydb-5f2bf90bebedec6c74fc7f66002b35c585f8060e.tar.gz |
Break persistent locks on scheme tx (#11493)
-rw-r--r-- | ydb/core/tx/datashard/alter_table_unit.cpp | 4 | ||||
-rw-r--r-- | ydb/core/tx/datashard/create_cdc_stream_unit.cpp | 4 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard.cpp | 5 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_impl.h | 5 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_ut_change_exchange.cpp | 38 | ||||
-rw-r--r-- | ydb/core/tx/datashard/drop_cdc_stream_unit.cpp | 4 | ||||
-rw-r--r-- | ydb/core/tx/datashard/drop_index_notice_unit.cpp | 4 | ||||
-rw-r--r-- | ydb/core/tx/datashard/initiate_build_index_unit.cpp | 4 |
8 files changed, 59 insertions, 9 deletions
diff --git a/ydb/core/tx/datashard/alter_table_unit.cpp b/ydb/core/tx/datashard/alter_table_unit.cpp index 59697e00079..60167f66252 100644 --- a/ydb/core/tx/datashard/alter_table_unit.cpp +++ b/ydb/core/tx/datashard/alter_table_unit.cpp @@ -1,4 +1,5 @@ #include "datashard_impl.h" +#include "datashard_locks_db.h" #include "datashard_pipeline.h" #include "execution_unit_ctors.h" @@ -151,7 +152,8 @@ EExecutionStatus TAlterTableUnit::Execute(TOperation::TPtr op, } TUserTable::TPtr info = DataShard.AlterUserTable(ctx, txc, alterTableTx); - DataShard.AddUserTable(tableId, info); + TDataShardLocksDb locksDb(DataShard, txc); + DataShard.AddUserTable(tableId, info, &locksDb); if (info->NeedSchemaSnapshots()) { DataShard.AddSchemaSnapshot(tableId, version, op->GetStep(), op->GetTxId(), txc, ctx); diff --git a/ydb/core/tx/datashard/create_cdc_stream_unit.cpp b/ydb/core/tx/datashard/create_cdc_stream_unit.cpp index 1f690063de5..67b31fbdc6b 100644 --- a/ydb/core/tx/datashard/create_cdc_stream_unit.cpp +++ b/ydb/core/tx/datashard/create_cdc_stream_unit.cpp @@ -1,4 +1,5 @@ #include "datashard_impl.h" +#include "datashard_locks_db.h" #include "datashard_pipeline.h" #include "execution_unit_ctors.h" @@ -43,7 +44,8 @@ public: Y_ABORT_UNLESS(version); auto tableInfo = DataShard.AlterTableAddCdcStream(ctx, txc, pathId, version, streamDesc); - DataShard.AddUserTable(pathId, tableInfo); + TDataShardLocksDb locksDb(DataShard, txc); + DataShard.AddUserTable(pathId, tableInfo, &locksDb); if (tableInfo->NeedSchemaSnapshots()) { DataShard.AddSchemaSnapshot(pathId, version, op->GetStep(), op->GetTxId(), txc, ctx); diff --git a/ydb/core/tx/datashard/datashard.cpp b/ydb/core/tx/datashard/datashard.cpp index 17cc5f57020..6771f4517ec 100644 --- a/ydb/core/tx/datashard/datashard.cpp +++ b/ydb/core/tx/datashard/datashard.cpp @@ -1890,7 +1890,6 @@ TUserTable::TPtr TDataShard::MoveUserTable(TOperation::TPtr op, const NKikimrTxD newTableInfo->StatsNeedUpdate = true; TDataShardLocksDb locksDb(*this, txc); - RemoveUserTable(prevId, &locksDb); AddUserTable(newId, newTableInfo); @@ -1967,8 +1966,8 @@ TUserTable::TPtr TDataShard::MoveUserIndex(TOperation::TPtr op, const NKikimrTxD } newTableInfo->SetSchema(schema); - - AddUserTable(pathId, newTableInfo); + TDataShardLocksDb locksDb(*this, txc); + AddUserTable(pathId, newTableInfo, &locksDb); if (newTableInfo->NeedSchemaSnapshots()) { AddSchemaSnapshot(pathId, version, op->GetStep(), op->GetTxId(), txc, ctx); diff --git a/ydb/core/tx/datashard/datashard_impl.h b/ydb/core/tx/datashard/datashard_impl.h index f103724450a..a5c275c22f0 100644 --- a/ydb/core/tx/datashard/datashard_impl.h +++ b/ydb/core/tx/datashard/datashard_impl.h @@ -1632,7 +1632,10 @@ public: TableInfos.erase(tableId.LocalPathId); } - void AddUserTable(const TPathId& tableId, TUserTable::TPtr tableInfo) { + void AddUserTable(const TPathId& tableId, TUserTable::TPtr tableInfo, ILocksDb* locksDb = nullptr) { + if (locksDb) { + SysLocks.RemoveSchema(tableId, locksDb); + } TableInfos[tableId.LocalPathId] = tableInfo; SysLocks.UpdateSchema(tableId, tableInfo->KeyColumnTypes); Pipeline.GetDepTracker().UpdateSchema(tableId, *tableInfo); diff --git a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp index 40614b6abe7..e3e61412efc 100644 --- a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp +++ b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp @@ -3868,6 +3868,44 @@ Y_UNIT_TEST_SUITE(Cdc) { MustNotLoseSchemaSnapshot(true); } + Y_UNIT_TEST(ShouldBreakLocksOnConcurrentSchemeTx) { + TPortManager portManager; + TServer::TPtr server = new TServer(TServerSettings(portManager.GetPort(2134), {}, DefaultPQConfig()) + .SetUseRealThreads(false) + .SetDomainName("Root") + ); + + auto& runtime = *server->GetRuntime(); + const auto edgeActor = runtime.AllocateEdgeActor(); + + SetupLogging(runtime); + InitRoot(server, edgeActor); + CreateShardedTable(server, edgeActor, "/Root", "Table", SimpleTable()); + + WaitTxNotification(server, edgeActor, AsyncAlterAddStream(server, "/Root", "Table", + Updates(NKikimrSchemeOp::ECdcStreamFormatJson))); + + ExecSQL(server, edgeActor, "UPSERT INTO `/Root/Table` (key, value) VALUES (1, 10);"); + + TString sessionId; + TString txId; + KqpSimpleBegin(runtime, sessionId, txId, "UPSERT INTO `/Root/Table` (key, value) VALUES (1, 11);"); + + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleContinue(runtime, sessionId, txId, "SELECT key, value FROM `/Root/Table`;"), + "{ items { uint32_value: 1 } items { uint32_value: 11 } }"); + + WaitTxNotification(server, edgeActor, AsyncAlterAddExtraColumn(server, "/Root", "Table")); + + UNIT_ASSERT_VALUES_EQUAL( + KqpSimpleCommit(runtime, sessionId, txId, "SELECT 1;"), + "ERROR: ABORTED"); + + WaitForContent(server, edgeActor, "/Root/Table/Stream", { + R"({"update":{"value":10},"key":[1]})", + }); + } + Y_UNIT_TEST(ResolvedTimestampsContinueAfterMerge) { TPortManager portManager; TServer::TPtr server = new TServer(TServerSettings(portManager.GetPort(2134), {}, DefaultPQConfig()) diff --git a/ydb/core/tx/datashard/drop_cdc_stream_unit.cpp b/ydb/core/tx/datashard/drop_cdc_stream_unit.cpp index 359bdd455f1..64fcf486346 100644 --- a/ydb/core/tx/datashard/drop_cdc_stream_unit.cpp +++ b/ydb/core/tx/datashard/drop_cdc_stream_unit.cpp @@ -1,4 +1,5 @@ #include "datashard_impl.h" +#include "datashard_locks_db.h" #include "datashard_pipeline.h" #include "execution_unit_ctors.h" @@ -40,7 +41,8 @@ public: Y_ABORT_UNLESS(version); auto tableInfo = DataShard.AlterTableDropCdcStream(ctx, txc, pathId, version, streamPathId); - DataShard.AddUserTable(pathId, tableInfo); + TDataShardLocksDb locksDb(DataShard, txc); + DataShard.AddUserTable(pathId, tableInfo, &locksDb); if (tableInfo->NeedSchemaSnapshots()) { DataShard.AddSchemaSnapshot(pathId, version, op->GetStep(), op->GetTxId(), txc, ctx); diff --git a/ydb/core/tx/datashard/drop_index_notice_unit.cpp b/ydb/core/tx/datashard/drop_index_notice_unit.cpp index d1df5149c41..6d45bd13a2a 100644 --- a/ydb/core/tx/datashard/drop_index_notice_unit.cpp +++ b/ydb/core/tx/datashard/drop_index_notice_unit.cpp @@ -1,4 +1,5 @@ #include "datashard_impl.h" +#include "datashard_locks_db.h" #include "datashard_pipeline.h" #include "execution_unit_ctors.h" @@ -52,7 +53,8 @@ public: } Y_ABORT_UNLESS(tableInfo); - DataShard.AddUserTable(pathId, tableInfo); + TDataShardLocksDb locksDb(DataShard, txc); + DataShard.AddUserTable(pathId, tableInfo, &locksDb); if (tableInfo->NeedSchemaSnapshots()) { DataShard.AddSchemaSnapshot(pathId, version, op->GetStep(), op->GetTxId(), txc, ctx); diff --git a/ydb/core/tx/datashard/initiate_build_index_unit.cpp b/ydb/core/tx/datashard/initiate_build_index_unit.cpp index a2ac702a739..d0bc48a8aa4 100644 --- a/ydb/core/tx/datashard/initiate_build_index_unit.cpp +++ b/ydb/core/tx/datashard/initiate_build_index_unit.cpp @@ -1,4 +1,5 @@ #include "datashard_impl.h" +#include "datashard_locks_db.h" #include "datashard_pipeline.h" #include "execution_unit_ctors.h" @@ -53,7 +54,8 @@ public: } Y_ABORT_UNLESS(tableInfo); - DataShard.AddUserTable(pathId, tableInfo); + TDataShardLocksDb locksDb(DataShard, txc); + DataShard.AddUserTable(pathId, tableInfo, &locksDb); if (tableInfo->NeedSchemaSnapshots()) { DataShard.AddSchemaSnapshot(pathId, version, op->GetStep(), op->GetTxId(), txc, ctx); |