aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorIlnaz Nizametdinov <ilnaz@ydb.tech>2024-11-12 14:43:26 +0300
committerGitHub <noreply@github.com>2024-11-12 11:43:26 +0000
commit91cc64219e78a1b47be64f2b815027c1fcdd3f1c (patch)
treecba2597fae1ee14ba38d2a72ca9cc86193f82211
parenta431c945bbf0d0a8a1f57e9c552fc09021cde4ee (diff)
downloadydb-91cc64219e78a1b47be64f2b815027c1fcdd3f1c.tar.gz
break locks on scheme tx (#11517)
-rw-r--r--ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp12
-rw-r--r--ydb/core/tx/datashard/alter_table_unit.cpp4
-rw-r--r--ydb/core/tx/datashard/create_cdc_stream_unit.cpp4
-rw-r--r--ydb/core/tx/datashard/datashard.cpp5
-rw-r--r--ydb/core/tx/datashard/datashard_impl.h5
-rw-r--r--ydb/core/tx/datashard/datashard_ut_change_exchange.cpp38
-rw-r--r--ydb/core/tx/datashard/drop_cdc_stream_unit.cpp4
-rw-r--r--ydb/core/tx/datashard/drop_index_notice_unit.cpp4
-rw-r--r--ydb/core/tx/datashard/initiate_build_index_unit.cpp4
9 files changed, 70 insertions, 10 deletions
diff --git a/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp b/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp
index e8065a3eea0..c97a2509abf 100644
--- a/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp
+++ b/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp
@@ -2351,9 +2351,19 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
Value2 String,
PRIMARY KEY (Key)
);
+ )", TTxControl::NoTx()).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+
+ result = db.ExecuteQuery(R"(
UPSERT INTO TestDdlDml2 (Key, Value1, Value2) VALUES (1, "1", "1");
SELECT * FROM TestDdlDml2;
ALTER TABLE TestDdlDml2 DROP COLUMN Value2;
+ )", TTxControl::NoTx()).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::ABORTED, result.GetIssues().ToString());
+
+ result = db.ExecuteQuery(R"(
+ UPSERT INTO TestDdlDml2 (Key, Value1) VALUES (1, "1");
+ SELECT * FROM TestDdlDml2;
UPSERT INTO TestDdlDml2 (Key, Value1) VALUES (2, "2");
SELECT * FROM TestDdlDml2;
CREATE TABLE TestDdlDml33 (
@@ -2363,7 +2373,7 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
)", TTxControl::NoTx()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
UNIT_ASSERT_VALUES_EQUAL(result.GetResultSets().size(), 2);
- CompareYson(R"([[[1u];["1"];["1"]]])", FormatResultSetYson(result.GetResultSet(0)));
+ CompareYson(R"([[[1u];["1"]]])", FormatResultSetYson(result.GetResultSet(0)));
CompareYson(R"([[[1u];["1"]];[[2u];["2"]]])", FormatResultSetYson(result.GetResultSet(1)));
UNIT_ASSERT_EQUAL_C(result.GetIssues().Size(), 0, result.GetIssues().ToString());
diff --git a/ydb/core/tx/datashard/alter_table_unit.cpp b/ydb/core/tx/datashard/alter_table_unit.cpp
index 59697e00079..60167f66252 100644
--- a/ydb/core/tx/datashard/alter_table_unit.cpp
+++ b/ydb/core/tx/datashard/alter_table_unit.cpp
@@ -1,4 +1,5 @@
#include "datashard_impl.h"
+#include "datashard_locks_db.h"
#include "datashard_pipeline.h"
#include "execution_unit_ctors.h"
@@ -151,7 +152,8 @@ EExecutionStatus TAlterTableUnit::Execute(TOperation::TPtr op,
}
TUserTable::TPtr info = DataShard.AlterUserTable(ctx, txc, alterTableTx);
- DataShard.AddUserTable(tableId, info);
+ TDataShardLocksDb locksDb(DataShard, txc);
+ DataShard.AddUserTable(tableId, info, &locksDb);
if (info->NeedSchemaSnapshots()) {
DataShard.AddSchemaSnapshot(tableId, version, op->GetStep(), op->GetTxId(), txc, ctx);
diff --git a/ydb/core/tx/datashard/create_cdc_stream_unit.cpp b/ydb/core/tx/datashard/create_cdc_stream_unit.cpp
index 1f690063de5..67b31fbdc6b 100644
--- a/ydb/core/tx/datashard/create_cdc_stream_unit.cpp
+++ b/ydb/core/tx/datashard/create_cdc_stream_unit.cpp
@@ -1,4 +1,5 @@
#include "datashard_impl.h"
+#include "datashard_locks_db.h"
#include "datashard_pipeline.h"
#include "execution_unit_ctors.h"
@@ -43,7 +44,8 @@ public:
Y_ABORT_UNLESS(version);
auto tableInfo = DataShard.AlterTableAddCdcStream(ctx, txc, pathId, version, streamDesc);
- DataShard.AddUserTable(pathId, tableInfo);
+ TDataShardLocksDb locksDb(DataShard, txc);
+ DataShard.AddUserTable(pathId, tableInfo, &locksDb);
if (tableInfo->NeedSchemaSnapshots()) {
DataShard.AddSchemaSnapshot(pathId, version, op->GetStep(), op->GetTxId(), txc, ctx);
diff --git a/ydb/core/tx/datashard/datashard.cpp b/ydb/core/tx/datashard/datashard.cpp
index 17cc5f57020..6771f4517ec 100644
--- a/ydb/core/tx/datashard/datashard.cpp
+++ b/ydb/core/tx/datashard/datashard.cpp
@@ -1890,7 +1890,6 @@ TUserTable::TPtr TDataShard::MoveUserTable(TOperation::TPtr op, const NKikimrTxD
newTableInfo->StatsNeedUpdate = true;
TDataShardLocksDb locksDb(*this, txc);
-
RemoveUserTable(prevId, &locksDb);
AddUserTable(newId, newTableInfo);
@@ -1967,8 +1966,8 @@ TUserTable::TPtr TDataShard::MoveUserIndex(TOperation::TPtr op, const NKikimrTxD
}
newTableInfo->SetSchema(schema);
-
- AddUserTable(pathId, newTableInfo);
+ TDataShardLocksDb locksDb(*this, txc);
+ AddUserTable(pathId, newTableInfo, &locksDb);
if (newTableInfo->NeedSchemaSnapshots()) {
AddSchemaSnapshot(pathId, version, op->GetStep(), op->GetTxId(), txc, ctx);
diff --git a/ydb/core/tx/datashard/datashard_impl.h b/ydb/core/tx/datashard/datashard_impl.h
index f103724450a..a5c275c22f0 100644
--- a/ydb/core/tx/datashard/datashard_impl.h
+++ b/ydb/core/tx/datashard/datashard_impl.h
@@ -1632,7 +1632,10 @@ public:
TableInfos.erase(tableId.LocalPathId);
}
- void AddUserTable(const TPathId& tableId, TUserTable::TPtr tableInfo) {
+ void AddUserTable(const TPathId& tableId, TUserTable::TPtr tableInfo, ILocksDb* locksDb = nullptr) {
+ if (locksDb) {
+ SysLocks.RemoveSchema(tableId, locksDb);
+ }
TableInfos[tableId.LocalPathId] = tableInfo;
SysLocks.UpdateSchema(tableId, tableInfo->KeyColumnTypes);
Pipeline.GetDepTracker().UpdateSchema(tableId, *tableInfo);
diff --git a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp
index 40614b6abe7..e3e61412efc 100644
--- a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp
@@ -3868,6 +3868,44 @@ Y_UNIT_TEST_SUITE(Cdc) {
MustNotLoseSchemaSnapshot(true);
}
+ Y_UNIT_TEST(ShouldBreakLocksOnConcurrentSchemeTx) {
+ TPortManager portManager;
+ TServer::TPtr server = new TServer(TServerSettings(portManager.GetPort(2134), {}, DefaultPQConfig())
+ .SetUseRealThreads(false)
+ .SetDomainName("Root")
+ );
+
+ auto& runtime = *server->GetRuntime();
+ const auto edgeActor = runtime.AllocateEdgeActor();
+
+ SetupLogging(runtime);
+ InitRoot(server, edgeActor);
+ CreateShardedTable(server, edgeActor, "/Root", "Table", SimpleTable());
+
+ WaitTxNotification(server, edgeActor, AsyncAlterAddStream(server, "/Root", "Table",
+ Updates(NKikimrSchemeOp::ECdcStreamFormatJson)));
+
+ ExecSQL(server, edgeActor, "UPSERT INTO `/Root/Table` (key, value) VALUES (1, 10);");
+
+ TString sessionId;
+ TString txId;
+ KqpSimpleBegin(runtime, sessionId, txId, "UPSERT INTO `/Root/Table` (key, value) VALUES (1, 11);");
+
+ UNIT_ASSERT_VALUES_EQUAL(
+ KqpSimpleContinue(runtime, sessionId, txId, "SELECT key, value FROM `/Root/Table`;"),
+ "{ items { uint32_value: 1 } items { uint32_value: 11 } }");
+
+ WaitTxNotification(server, edgeActor, AsyncAlterAddExtraColumn(server, "/Root", "Table"));
+
+ UNIT_ASSERT_VALUES_EQUAL(
+ KqpSimpleCommit(runtime, sessionId, txId, "SELECT 1;"),
+ "ERROR: ABORTED");
+
+ WaitForContent(server, edgeActor, "/Root/Table/Stream", {
+ R"({"update":{"value":10},"key":[1]})",
+ });
+ }
+
Y_UNIT_TEST(ResolvedTimestampsContinueAfterMerge) {
TPortManager portManager;
TServer::TPtr server = new TServer(TServerSettings(portManager.GetPort(2134), {}, DefaultPQConfig())
diff --git a/ydb/core/tx/datashard/drop_cdc_stream_unit.cpp b/ydb/core/tx/datashard/drop_cdc_stream_unit.cpp
index 359bdd455f1..64fcf486346 100644
--- a/ydb/core/tx/datashard/drop_cdc_stream_unit.cpp
+++ b/ydb/core/tx/datashard/drop_cdc_stream_unit.cpp
@@ -1,4 +1,5 @@
#include "datashard_impl.h"
+#include "datashard_locks_db.h"
#include "datashard_pipeline.h"
#include "execution_unit_ctors.h"
@@ -40,7 +41,8 @@ public:
Y_ABORT_UNLESS(version);
auto tableInfo = DataShard.AlterTableDropCdcStream(ctx, txc, pathId, version, streamPathId);
- DataShard.AddUserTable(pathId, tableInfo);
+ TDataShardLocksDb locksDb(DataShard, txc);
+ DataShard.AddUserTable(pathId, tableInfo, &locksDb);
if (tableInfo->NeedSchemaSnapshots()) {
DataShard.AddSchemaSnapshot(pathId, version, op->GetStep(), op->GetTxId(), txc, ctx);
diff --git a/ydb/core/tx/datashard/drop_index_notice_unit.cpp b/ydb/core/tx/datashard/drop_index_notice_unit.cpp
index d1df5149c41..6d45bd13a2a 100644
--- a/ydb/core/tx/datashard/drop_index_notice_unit.cpp
+++ b/ydb/core/tx/datashard/drop_index_notice_unit.cpp
@@ -1,4 +1,5 @@
#include "datashard_impl.h"
+#include "datashard_locks_db.h"
#include "datashard_pipeline.h"
#include "execution_unit_ctors.h"
@@ -52,7 +53,8 @@ public:
}
Y_ABORT_UNLESS(tableInfo);
- DataShard.AddUserTable(pathId, tableInfo);
+ TDataShardLocksDb locksDb(DataShard, txc);
+ DataShard.AddUserTable(pathId, tableInfo, &locksDb);
if (tableInfo->NeedSchemaSnapshots()) {
DataShard.AddSchemaSnapshot(pathId, version, op->GetStep(), op->GetTxId(), txc, ctx);
diff --git a/ydb/core/tx/datashard/initiate_build_index_unit.cpp b/ydb/core/tx/datashard/initiate_build_index_unit.cpp
index a2ac702a739..d0bc48a8aa4 100644
--- a/ydb/core/tx/datashard/initiate_build_index_unit.cpp
+++ b/ydb/core/tx/datashard/initiate_build_index_unit.cpp
@@ -1,4 +1,5 @@
#include "datashard_impl.h"
+#include "datashard_locks_db.h"
#include "datashard_pipeline.h"
#include "execution_unit_ctors.h"
@@ -53,7 +54,8 @@ public:
}
Y_ABORT_UNLESS(tableInfo);
- DataShard.AddUserTable(pathId, tableInfo);
+ TDataShardLocksDb locksDb(DataShard, txc);
+ DataShard.AddUserTable(pathId, tableInfo, &locksDb);
if (tableInfo->NeedSchemaSnapshots()) {
DataShard.AddSchemaSnapshot(pathId, version, op->GetStep(), op->GetTxId(), txc, ctx);