aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorIlnaz Nizametdinov <ilnaz@ydb.tech>2024-11-12 12:09:21 +0300
committerGitHub <noreply@github.com>2024-11-12 12:09:21 +0300
commit5f2bf90bebedec6c74fc7f66002b35c585f8060e (patch)
treed9fbc731afc53ca3e222678a1121d7596bad9035
parent75519f11681f8e574b8cc3020c1aebb9727bc60a (diff)
downloadydb-5f2bf90bebedec6c74fc7f66002b35c585f8060e.tar.gz
Break persistent locks on scheme tx (#11493)
-rw-r--r--ydb/core/tx/datashard/alter_table_unit.cpp4
-rw-r--r--ydb/core/tx/datashard/create_cdc_stream_unit.cpp4
-rw-r--r--ydb/core/tx/datashard/datashard.cpp5
-rw-r--r--ydb/core/tx/datashard/datashard_impl.h5
-rw-r--r--ydb/core/tx/datashard/datashard_ut_change_exchange.cpp38
-rw-r--r--ydb/core/tx/datashard/drop_cdc_stream_unit.cpp4
-rw-r--r--ydb/core/tx/datashard/drop_index_notice_unit.cpp4
-rw-r--r--ydb/core/tx/datashard/initiate_build_index_unit.cpp4
8 files changed, 59 insertions, 9 deletions
diff --git a/ydb/core/tx/datashard/alter_table_unit.cpp b/ydb/core/tx/datashard/alter_table_unit.cpp
index 59697e00079..60167f66252 100644
--- a/ydb/core/tx/datashard/alter_table_unit.cpp
+++ b/ydb/core/tx/datashard/alter_table_unit.cpp
@@ -1,4 +1,5 @@
#include "datashard_impl.h"
+#include "datashard_locks_db.h"
#include "datashard_pipeline.h"
#include "execution_unit_ctors.h"
@@ -151,7 +152,8 @@ EExecutionStatus TAlterTableUnit::Execute(TOperation::TPtr op,
}
TUserTable::TPtr info = DataShard.AlterUserTable(ctx, txc, alterTableTx);
- DataShard.AddUserTable(tableId, info);
+ TDataShardLocksDb locksDb(DataShard, txc);
+ DataShard.AddUserTable(tableId, info, &locksDb);
if (info->NeedSchemaSnapshots()) {
DataShard.AddSchemaSnapshot(tableId, version, op->GetStep(), op->GetTxId(), txc, ctx);
diff --git a/ydb/core/tx/datashard/create_cdc_stream_unit.cpp b/ydb/core/tx/datashard/create_cdc_stream_unit.cpp
index 1f690063de5..67b31fbdc6b 100644
--- a/ydb/core/tx/datashard/create_cdc_stream_unit.cpp
+++ b/ydb/core/tx/datashard/create_cdc_stream_unit.cpp
@@ -1,4 +1,5 @@
#include "datashard_impl.h"
+#include "datashard_locks_db.h"
#include "datashard_pipeline.h"
#include "execution_unit_ctors.h"
@@ -43,7 +44,8 @@ public:
Y_ABORT_UNLESS(version);
auto tableInfo = DataShard.AlterTableAddCdcStream(ctx, txc, pathId, version, streamDesc);
- DataShard.AddUserTable(pathId, tableInfo);
+ TDataShardLocksDb locksDb(DataShard, txc);
+ DataShard.AddUserTable(pathId, tableInfo, &locksDb);
if (tableInfo->NeedSchemaSnapshots()) {
DataShard.AddSchemaSnapshot(pathId, version, op->GetStep(), op->GetTxId(), txc, ctx);
diff --git a/ydb/core/tx/datashard/datashard.cpp b/ydb/core/tx/datashard/datashard.cpp
index 17cc5f57020..6771f4517ec 100644
--- a/ydb/core/tx/datashard/datashard.cpp
+++ b/ydb/core/tx/datashard/datashard.cpp
@@ -1890,7 +1890,6 @@ TUserTable::TPtr TDataShard::MoveUserTable(TOperation::TPtr op, const NKikimrTxD
newTableInfo->StatsNeedUpdate = true;
TDataShardLocksDb locksDb(*this, txc);
-
RemoveUserTable(prevId, &locksDb);
AddUserTable(newId, newTableInfo);
@@ -1967,8 +1966,8 @@ TUserTable::TPtr TDataShard::MoveUserIndex(TOperation::TPtr op, const NKikimrTxD
}
newTableInfo->SetSchema(schema);
-
- AddUserTable(pathId, newTableInfo);
+ TDataShardLocksDb locksDb(*this, txc);
+ AddUserTable(pathId, newTableInfo, &locksDb);
if (newTableInfo->NeedSchemaSnapshots()) {
AddSchemaSnapshot(pathId, version, op->GetStep(), op->GetTxId(), txc, ctx);
diff --git a/ydb/core/tx/datashard/datashard_impl.h b/ydb/core/tx/datashard/datashard_impl.h
index f103724450a..a5c275c22f0 100644
--- a/ydb/core/tx/datashard/datashard_impl.h
+++ b/ydb/core/tx/datashard/datashard_impl.h
@@ -1632,7 +1632,10 @@ public:
TableInfos.erase(tableId.LocalPathId);
}
- void AddUserTable(const TPathId& tableId, TUserTable::TPtr tableInfo) {
+ void AddUserTable(const TPathId& tableId, TUserTable::TPtr tableInfo, ILocksDb* locksDb = nullptr) {
+ if (locksDb) {
+ SysLocks.RemoveSchema(tableId, locksDb);
+ }
TableInfos[tableId.LocalPathId] = tableInfo;
SysLocks.UpdateSchema(tableId, tableInfo->KeyColumnTypes);
Pipeline.GetDepTracker().UpdateSchema(tableId, *tableInfo);
diff --git a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp
index 40614b6abe7..e3e61412efc 100644
--- a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp
@@ -3868,6 +3868,44 @@ Y_UNIT_TEST_SUITE(Cdc) {
MustNotLoseSchemaSnapshot(true);
}
+ Y_UNIT_TEST(ShouldBreakLocksOnConcurrentSchemeTx) {
+ TPortManager portManager;
+ TServer::TPtr server = new TServer(TServerSettings(portManager.GetPort(2134), {}, DefaultPQConfig())
+ .SetUseRealThreads(false)
+ .SetDomainName("Root")
+ );
+
+ auto& runtime = *server->GetRuntime();
+ const auto edgeActor = runtime.AllocateEdgeActor();
+
+ SetupLogging(runtime);
+ InitRoot(server, edgeActor);
+ CreateShardedTable(server, edgeActor, "/Root", "Table", SimpleTable());
+
+ WaitTxNotification(server, edgeActor, AsyncAlterAddStream(server, "/Root", "Table",
+ Updates(NKikimrSchemeOp::ECdcStreamFormatJson)));
+
+ ExecSQL(server, edgeActor, "UPSERT INTO `/Root/Table` (key, value) VALUES (1, 10);");
+
+ TString sessionId;
+ TString txId;
+ KqpSimpleBegin(runtime, sessionId, txId, "UPSERT INTO `/Root/Table` (key, value) VALUES (1, 11);");
+
+ UNIT_ASSERT_VALUES_EQUAL(
+ KqpSimpleContinue(runtime, sessionId, txId, "SELECT key, value FROM `/Root/Table`;"),
+ "{ items { uint32_value: 1 } items { uint32_value: 11 } }");
+
+ WaitTxNotification(server, edgeActor, AsyncAlterAddExtraColumn(server, "/Root", "Table"));
+
+ UNIT_ASSERT_VALUES_EQUAL(
+ KqpSimpleCommit(runtime, sessionId, txId, "SELECT 1;"),
+ "ERROR: ABORTED");
+
+ WaitForContent(server, edgeActor, "/Root/Table/Stream", {
+ R"({"update":{"value":10},"key":[1]})",
+ });
+ }
+
Y_UNIT_TEST(ResolvedTimestampsContinueAfterMerge) {
TPortManager portManager;
TServer::TPtr server = new TServer(TServerSettings(portManager.GetPort(2134), {}, DefaultPQConfig())
diff --git a/ydb/core/tx/datashard/drop_cdc_stream_unit.cpp b/ydb/core/tx/datashard/drop_cdc_stream_unit.cpp
index 359bdd455f1..64fcf486346 100644
--- a/ydb/core/tx/datashard/drop_cdc_stream_unit.cpp
+++ b/ydb/core/tx/datashard/drop_cdc_stream_unit.cpp
@@ -1,4 +1,5 @@
#include "datashard_impl.h"
+#include "datashard_locks_db.h"
#include "datashard_pipeline.h"
#include "execution_unit_ctors.h"
@@ -40,7 +41,8 @@ public:
Y_ABORT_UNLESS(version);
auto tableInfo = DataShard.AlterTableDropCdcStream(ctx, txc, pathId, version, streamPathId);
- DataShard.AddUserTable(pathId, tableInfo);
+ TDataShardLocksDb locksDb(DataShard, txc);
+ DataShard.AddUserTable(pathId, tableInfo, &locksDb);
if (tableInfo->NeedSchemaSnapshots()) {
DataShard.AddSchemaSnapshot(pathId, version, op->GetStep(), op->GetTxId(), txc, ctx);
diff --git a/ydb/core/tx/datashard/drop_index_notice_unit.cpp b/ydb/core/tx/datashard/drop_index_notice_unit.cpp
index d1df5149c41..6d45bd13a2a 100644
--- a/ydb/core/tx/datashard/drop_index_notice_unit.cpp
+++ b/ydb/core/tx/datashard/drop_index_notice_unit.cpp
@@ -1,4 +1,5 @@
#include "datashard_impl.h"
+#include "datashard_locks_db.h"
#include "datashard_pipeline.h"
#include "execution_unit_ctors.h"
@@ -52,7 +53,8 @@ public:
}
Y_ABORT_UNLESS(tableInfo);
- DataShard.AddUserTable(pathId, tableInfo);
+ TDataShardLocksDb locksDb(DataShard, txc);
+ DataShard.AddUserTable(pathId, tableInfo, &locksDb);
if (tableInfo->NeedSchemaSnapshots()) {
DataShard.AddSchemaSnapshot(pathId, version, op->GetStep(), op->GetTxId(), txc, ctx);
diff --git a/ydb/core/tx/datashard/initiate_build_index_unit.cpp b/ydb/core/tx/datashard/initiate_build_index_unit.cpp
index a2ac702a739..d0bc48a8aa4 100644
--- a/ydb/core/tx/datashard/initiate_build_index_unit.cpp
+++ b/ydb/core/tx/datashard/initiate_build_index_unit.cpp
@@ -1,4 +1,5 @@
#include "datashard_impl.h"
+#include "datashard_locks_db.h"
#include "datashard_pipeline.h"
#include "execution_unit_ctors.h"
@@ -53,7 +54,8 @@ public:
}
Y_ABORT_UNLESS(tableInfo);
- DataShard.AddUserTable(pathId, tableInfo);
+ TDataShardLocksDb locksDb(DataShard, txc);
+ DataShard.AddUserTable(pathId, tableInfo, &locksDb);
if (tableInfo->NeedSchemaSnapshots()) {
DataShard.AddSchemaSnapshot(pathId, version, op->GetStep(), op->GetTxId(), txc, ctx);