aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorNikita Vasilev <ns-vasilev@ydb.tech>2024-01-30 18:22:01 +0300
committerGitHub <noreply@github.com>2024-01-30 18:22:01 +0300
commit093c9af3a29c8f04b143664654e53db847321bef (patch)
tree30457ab2729b35e6c5cdcf3caa4967cf7a47cc7a
parentd2c589a159a54b189cb8c005cb143431a83a38e4 (diff)
downloadydb-093c9af3a29c8f04b143664654e53db847321bef.tar.gz
Temporary disable mixed column/row tables transactions (#1412)
-rw-r--r--ydb/core/kqp/session_actor/kqp_query_state.h2
-rw-r--r--ydb/core/kqp/session_actor/kqp_session_actor.cpp6
-rw-r--r--ydb/core/kqp/session_actor/kqp_tx.cpp51
-rw-r--r--ydb/core/kqp/session_actor/kqp_tx.h5
-rw-r--r--ydb/core/kqp/ut/olap/kqp_olap_ut.cpp84
5 files changed, 111 insertions, 37 deletions
diff --git a/ydb/core/kqp/session_actor/kqp_query_state.h b/ydb/core/kqp/session_actor/kqp_query_state.h
index 192b923d8f..4bcc4cc47f 100644
--- a/ydb/core/kqp/session_actor/kqp_query_state.h
+++ b/ydb/core/kqp/session_actor/kqp_query_state.h
@@ -256,7 +256,7 @@ public:
auto type = GetType();
if (type == NKikimrKqp::QUERY_TYPE_SQL_GENERIC_CONCURRENT_QUERY ||
type == NKikimrKqp::QUERY_TYPE_SQL_GENERIC_QUERY) {
- return ::NKikimr::NKqp::HasOlapTableInTx(PreparedQuery->GetPhysicalQuery());
+ return ::NKikimr::NKqp::HasOlapTableReadInTx(PreparedQuery->GetPhysicalQuery());
}
return (
type == NKikimrKqp::QUERY_TYPE_SQL_SCAN ||
diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp
index fe2a661c3c..cdbeab7595 100644
--- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp
+++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp
@@ -677,6 +677,12 @@ public:
}
const NKqpProto::TKqpPhyQuery& phyQuery = QueryState->PreparedQuery->GetPhysicalQuery();
+ if ((::NKikimr::NKqp::HasOlapTableReadInTx(phyQuery) || ::NKikimr::NKqp::HasOlapTableWriteInTx(phyQuery))
+ && (::NKikimr::NKqp::HasOltpTableReadInTx(phyQuery) || ::NKikimr::NKqp::HasOltpTableWriteInTx(phyQuery))) {
+ ReplyQueryError(Ydb::StatusIds::PRECONDITION_FAILED,
+ "Transactions between column and row tables are disabled at current time.");
+ return false;
+ }
QueryState->TxCtx->SetTempTables(QueryState->TempTablesState);
auto [success, issues] = QueryState->TxCtx->ApplyTableOperations(phyQuery.GetTableOps(), phyQuery.GetTableInfos(),
EKikimrQueryType::Dml);
diff --git a/ydb/core/kqp/session_actor/kqp_tx.cpp b/ydb/core/kqp/session_actor/kqp_tx.cpp
index b419125c66..2e9bcc6b20 100644
--- a/ydb/core/kqp/session_actor/kqp_tx.cpp
+++ b/ydb/core/kqp/session_actor/kqp_tx.cpp
@@ -178,7 +178,7 @@ bool NeedSnapshot(const TKqpTransactionContext& txCtx, const NYql::TKikimrConfig
return readPhases > 1;
}
-bool HasOlapTableInTx(const NKqpProto::TKqpPhyQuery& physicalQuery) {
+bool HasOlapTableReadInTx(const NKqpProto::TKqpPhyQuery& physicalQuery) {
for (const auto &tx : physicalQuery.GetTransactions()) {
for (const auto &stage : tx.GetStages()) {
for (const auto &tableOp : stage.GetTableOps()) {
@@ -191,5 +191,54 @@ bool HasOlapTableInTx(const NKqpProto::TKqpPhyQuery& physicalQuery) {
return false;
}
+bool HasOlapTableWriteInTx(const NKqpProto::TKqpPhyQuery& physicalQuery) {
+ for (const auto &tx : physicalQuery.GetTransactions()) {
+ for (const auto &stage : tx.GetStages()) {
+ for (const auto& sink : stage.GetSinks()) {
+ if (sink.GetTypeCase() == NKqpProto::TKqpSink::kInternalSink) {
+ return true;
+ }
+ }
+ }
+ }
+ return false;
+}
+
+bool HasOltpTableReadInTx(const NKqpProto::TKqpPhyQuery& physicalQuery) {
+ for (const auto &tx : physicalQuery.GetTransactions()) {
+ for (const auto &stage : tx.GetStages()) {
+ for (const auto &tableOp : stage.GetTableOps()) {
+ switch (tableOp.GetTypeCase()) {
+ case NKqpProto::TKqpPhyTableOperation::kReadRange:
+ case NKqpProto::TKqpPhyTableOperation::kLookup:
+ case NKqpProto::TKqpPhyTableOperation::kReadRanges:
+ return true;
+ case NKqpProto::TKqpPhyTableOperation::kReadOlapRange:
+ case NKqpProto::TKqpPhyTableOperation::kUpsertRows:
+ case NKqpProto::TKqpPhyTableOperation::kDeleteRows:
+ break;
+ default:
+ YQL_ENSURE(false, "unexpected type");
+ }
+ }
+ }
+ }
+ return false;
+}
+
+bool HasOltpTableWriteInTx(const NKqpProto::TKqpPhyQuery& physicalQuery) {
+ for (const auto &tx : physicalQuery.GetTransactions()) {
+ for (const auto &stage : tx.GetStages()) {
+ for (const auto &tableOp : stage.GetTableOps()) {
+ if (tableOp.GetTypeCase() == NKqpProto::TKqpPhyTableOperation::kUpsertRows
+ || tableOp.GetTypeCase() == NKqpProto::TKqpPhyTableOperation::kDeleteRows) {
+ return true;
+ }
+ }
+ }
+ }
+ return false;
+}
+
} // namespace NKqp
} // namespace NKikimr
diff --git a/ydb/core/kqp/session_actor/kqp_tx.h b/ydb/core/kqp/session_actor/kqp_tx.h
index ff2237bf97..171d6030b9 100644
--- a/ydb/core/kqp/session_actor/kqp_tx.h
+++ b/ydb/core/kqp/session_actor/kqp_tx.h
@@ -440,6 +440,9 @@ std::pair<bool, std::vector<NYql::TIssue>> MergeLocks(const NKikimrMiniKQL::TTyp
bool NeedSnapshot(const TKqpTransactionContext& txCtx, const NYql::TKikimrConfiguration& config, bool rollbackTx,
bool commitTx, const NKqpProto::TKqpPhyQuery& physicalQuery);
-bool HasOlapTableInTx(const NKqpProto::TKqpPhyQuery& physicalQuery);
+bool HasOlapTableReadInTx(const NKqpProto::TKqpPhyQuery& physicalQuery);
+bool HasOlapTableWriteInTx(const NKqpProto::TKqpPhyQuery& physicalQuery);
+bool HasOltpTableReadInTx(const NKqpProto::TKqpPhyQuery& physicalQuery);
+bool HasOltpTableWriteInTx(const NKqpProto::TKqpPhyQuery& physicalQuery);
} // namespace NKikimr::NKqp
diff --git a/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp b/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp
index d8d10c18fb..d9f49cd4b0 100644
--- a/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp
+++ b/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp
@@ -5560,6 +5560,15 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
auto session = kikimr.GetTableClient().CreateSession().GetValueSync().GetSession();
const TString query = R"(
+ CREATE TABLE `/Root/ColumnSource` (
+ Col1 Uint64 NOT NULL,
+ Col2 String,
+ Col3 Int32 NOT NULL,
+ PRIMARY KEY (Col1)
+ )
+ PARTITION BY HASH(Col1)
+ WITH (STORE = COLUMN, AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 10);
+
CREATE TABLE `/Root/DataShard1` (
Col1 Uint64 NOT NULL,
Col2 String,
@@ -5586,6 +5595,15 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
PARTITION BY HASH(Col1)
WITH (STORE = COLUMN, AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 16);
+ CREATE TABLE `/Root/ColumnShard3` (
+ Col1 Uint64 NOT NULL,
+ Col2 String,
+ Col3 Int32 NOT NULL,
+ PRIMARY KEY (Col1)
+ )
+ PARTITION BY HASH(Col1)
+ WITH (STORE = COLUMN, AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 16);
+
CREATE TABLE `/Root/DataShard2` (
Col1 Uint64 NOT NULL,
Col2 String,
@@ -5599,34 +5617,19 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
auto client = kikimr.GetQueryClient();
- auto prepareResult = client.ExecuteQuery(R"(
- REPLACE INTO `/Root/DataShard1` (Col1, Col2, Col3) VALUES
- (1u, "test1", 10), (2u, "test2", 11), (3u, "test3", 12), (4u, NULL, 13);
- )", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
- UNIT_ASSERT_C(prepareResult.IsSuccess(), prepareResult.GetIssues().ToString());
-
{
- // row -> column
- const TString sql = R"(
- REPLACE INTO `/Root/ColumnShard1`
- SELECT * FROM `/Root/DataShard1`
- )";
- auto insertResult = client.ExecuteQuery(sql, NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync();
- UNIT_ASSERT_C(insertResult.IsSuccess(), insertResult.GetIssues().ToString());
-
- auto it = client.StreamExecuteQuery(R"(
- SELECT * FROM `/Root/ColumnShard1` ORDER BY Col1, Col2, Col3;
+ auto prepareResult = client.ExecuteQuery(R"(
+ REPLACE INTO `/Root/ColumnSource` (Col1, Col2, Col3) VALUES
+ (1u, "test1", 10), (2u, "test2", 11), (3u, "test3", 12), (4u, NULL, 13);
)", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
- UNIT_ASSERT_VALUES_EQUAL_C(it.GetStatus(), EStatus::SUCCESS, it.GetIssues().ToString());
- TString output = StreamResultToYson(it);
- CompareYson(output, R"([[1u;["test1"];10];[2u;["test2"];11];[3u;["test3"];12];[4u;#;13]])");
+ UNIT_ASSERT_C(prepareResult.IsSuccess(), prepareResult.GetIssues().ToString());
}
{
// Missing Nullable column
const TString sql = R"(
REPLACE INTO `/Root/ColumnShard1`
- SELECT 10u + Col1 AS Col1, 100 + Col3 AS Col3 FROM `/Root/DataShard1`
+ SELECT 10u + Col1 AS Col1, 100 + Col3 AS Col3 FROM `/Root/ColumnSource`
)";
auto insertResult = client.ExecuteQuery(sql, NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync();
UNIT_ASSERT_C(insertResult.IsSuccess(), insertResult.GetIssues().ToString());
@@ -5638,14 +5641,14 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
TString output = StreamResultToYson(it);
CompareYson(
output,
- R"([[1u;["test1"];10];[2u;["test2"];11];[3u;["test3"];12];[4u;#;13];[11u;#;110];[12u;#;111];[13u;#;112];[14u;#;113]])");
+ R"([[11u;#;110];[12u;#;111];[13u;#;112];[14u;#;113]])");
}
{
// column -> column
const TString sql = R"(
REPLACE INTO `/Root/ColumnShard2`
- SELECT * FROM `/Root/ColumnShard1`
+ SELECT * FROM `/Root/ColumnSource`
)";
auto insertResult = client.ExecuteQuery(sql, NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync();
UNIT_ASSERT_C(insertResult.IsSuccess(), insertResult.GetIssues().ToString());
@@ -5657,26 +5660,39 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
TString output = StreamResultToYson(it);
CompareYson(
output,
- R"([[1u;["test1"];10];[2u;["test2"];11];[3u;["test3"];12];[4u;#;13];[11u;#;110];[12u;#;111];[13u;#;112];[14u;#;113]])");
+ R"([[1u;["test1"];10];[2u;["test2"];11];[3u;["test3"];12];[4u;#;13]])");
+ }
+
+ {
+ auto prepareResult = client.ExecuteQuery(R"(
+ REPLACE INTO `/Root/DataShard1` (Col1, Col2, Col3) VALUES
+ (1u, "test1", 10), (2u, "test2", 11), (3u, "test3", 12), (4u, NULL, 13);
+ )", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
+ UNIT_ASSERT_C(prepareResult.IsSuccess(), prepareResult.GetIssues().ToString());
+
+ // row -> column
+ const TString sql = R"(
+ REPLACE INTO `/Root/ColumnShard3`
+ SELECT * FROM `/Root/DataShard1`
+ )";
+ auto insertResult = client.ExecuteQuery(sql, NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync();
+ UNIT_ASSERT(!insertResult.IsSuccess());
+ UNIT_ASSERT_C(
+ insertResult.GetIssues().ToString().Contains("Transactions between column and row tables are disabled at current time"),
+ insertResult.GetIssues().ToString());
}
{
// column -> row
const TString sql = R"(
REPLACE INTO `/Root/DataShard2`
- SELECT * FROM `/Root/ColumnShard2`
+ SELECT * FROM `/Root/ColumnSource`
)";
auto insertResult = client.ExecuteQuery(sql, NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync();
- UNIT_ASSERT_C(insertResult.IsSuccess(), insertResult.GetIssues().ToString());
-
- auto it = client.StreamExecuteQuery(R"(
- SELECT * FROM `/Root/DataShard2` ORDER BY Col1, Col2, Col3;
- )", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
- UNIT_ASSERT_VALUES_EQUAL_C(it.GetStatus(), EStatus::SUCCESS, it.GetIssues().ToString());
- TString output = StreamResultToYson(it);
- CompareYson(
- output,
- R"([[1u;["test1"];10];[2u;["test2"];11];[3u;["test3"];12];[4u;#;13];[11u;#;110];[12u;#;111];[13u;#;112];[14u;#;113]])");
+ UNIT_ASSERT(!insertResult.IsSuccess());
+ UNIT_ASSERT_C(
+ insertResult.GetIssues().ToString().Contains("Transactions between column and row tables are disabled at current time"),
+ insertResult.GetIssues().ToString());
}
}