diff options
author | Nikita Vasilev <ns-vasilev@ydb.tech> | 2024-01-30 18:22:01 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-01-30 18:22:01 +0300 |
commit | 093c9af3a29c8f04b143664654e53db847321bef (patch) | |
tree | 30457ab2729b35e6c5cdcf3caa4967cf7a47cc7a | |
parent | d2c589a159a54b189cb8c005cb143431a83a38e4 (diff) | |
download | ydb-093c9af3a29c8f04b143664654e53db847321bef.tar.gz |
Temporary disable mixed column/row tables transactions (#1412)
-rw-r--r-- | ydb/core/kqp/session_actor/kqp_query_state.h | 2 | ||||
-rw-r--r-- | ydb/core/kqp/session_actor/kqp_session_actor.cpp | 6 | ||||
-rw-r--r-- | ydb/core/kqp/session_actor/kqp_tx.cpp | 51 | ||||
-rw-r--r-- | ydb/core/kqp/session_actor/kqp_tx.h | 5 | ||||
-rw-r--r-- | ydb/core/kqp/ut/olap/kqp_olap_ut.cpp | 84 |
5 files changed, 111 insertions, 37 deletions
diff --git a/ydb/core/kqp/session_actor/kqp_query_state.h b/ydb/core/kqp/session_actor/kqp_query_state.h index 192b923d8f..4bcc4cc47f 100644 --- a/ydb/core/kqp/session_actor/kqp_query_state.h +++ b/ydb/core/kqp/session_actor/kqp_query_state.h @@ -256,7 +256,7 @@ public: auto type = GetType(); if (type == NKikimrKqp::QUERY_TYPE_SQL_GENERIC_CONCURRENT_QUERY || type == NKikimrKqp::QUERY_TYPE_SQL_GENERIC_QUERY) { - return ::NKikimr::NKqp::HasOlapTableInTx(PreparedQuery->GetPhysicalQuery()); + return ::NKikimr::NKqp::HasOlapTableReadInTx(PreparedQuery->GetPhysicalQuery()); } return ( type == NKikimrKqp::QUERY_TYPE_SQL_SCAN || diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp index fe2a661c3c..cdbeab7595 100644 --- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp +++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp @@ -677,6 +677,12 @@ public: } const NKqpProto::TKqpPhyQuery& phyQuery = QueryState->PreparedQuery->GetPhysicalQuery(); + if ((::NKikimr::NKqp::HasOlapTableReadInTx(phyQuery) || ::NKikimr::NKqp::HasOlapTableWriteInTx(phyQuery)) + && (::NKikimr::NKqp::HasOltpTableReadInTx(phyQuery) || ::NKikimr::NKqp::HasOltpTableWriteInTx(phyQuery))) { + ReplyQueryError(Ydb::StatusIds::PRECONDITION_FAILED, + "Transactions between column and row tables are disabled at current time."); + return false; + } QueryState->TxCtx->SetTempTables(QueryState->TempTablesState); auto [success, issues] = QueryState->TxCtx->ApplyTableOperations(phyQuery.GetTableOps(), phyQuery.GetTableInfos(), EKikimrQueryType::Dml); diff --git a/ydb/core/kqp/session_actor/kqp_tx.cpp b/ydb/core/kqp/session_actor/kqp_tx.cpp index b419125c66..2e9bcc6b20 100644 --- a/ydb/core/kqp/session_actor/kqp_tx.cpp +++ b/ydb/core/kqp/session_actor/kqp_tx.cpp @@ -178,7 +178,7 @@ bool NeedSnapshot(const TKqpTransactionContext& txCtx, const NYql::TKikimrConfig return readPhases > 1; } -bool HasOlapTableInTx(const NKqpProto::TKqpPhyQuery& physicalQuery) { +bool HasOlapTableReadInTx(const NKqpProto::TKqpPhyQuery& physicalQuery) { for (const auto &tx : physicalQuery.GetTransactions()) { for (const auto &stage : tx.GetStages()) { for (const auto &tableOp : stage.GetTableOps()) { @@ -191,5 +191,54 @@ bool HasOlapTableInTx(const NKqpProto::TKqpPhyQuery& physicalQuery) { return false; } +bool HasOlapTableWriteInTx(const NKqpProto::TKqpPhyQuery& physicalQuery) { + for (const auto &tx : physicalQuery.GetTransactions()) { + for (const auto &stage : tx.GetStages()) { + for (const auto& sink : stage.GetSinks()) { + if (sink.GetTypeCase() == NKqpProto::TKqpSink::kInternalSink) { + return true; + } + } + } + } + return false; +} + +bool HasOltpTableReadInTx(const NKqpProto::TKqpPhyQuery& physicalQuery) { + for (const auto &tx : physicalQuery.GetTransactions()) { + for (const auto &stage : tx.GetStages()) { + for (const auto &tableOp : stage.GetTableOps()) { + switch (tableOp.GetTypeCase()) { + case NKqpProto::TKqpPhyTableOperation::kReadRange: + case NKqpProto::TKqpPhyTableOperation::kLookup: + case NKqpProto::TKqpPhyTableOperation::kReadRanges: + return true; + case NKqpProto::TKqpPhyTableOperation::kReadOlapRange: + case NKqpProto::TKqpPhyTableOperation::kUpsertRows: + case NKqpProto::TKqpPhyTableOperation::kDeleteRows: + break; + default: + YQL_ENSURE(false, "unexpected type"); + } + } + } + } + return false; +} + +bool HasOltpTableWriteInTx(const NKqpProto::TKqpPhyQuery& physicalQuery) { + for (const auto &tx : physicalQuery.GetTransactions()) { + for (const auto &stage : tx.GetStages()) { + for (const auto &tableOp : stage.GetTableOps()) { + if (tableOp.GetTypeCase() == NKqpProto::TKqpPhyTableOperation::kUpsertRows + || tableOp.GetTypeCase() == NKqpProto::TKqpPhyTableOperation::kDeleteRows) { + return true; + } + } + } + } + return false; +} + } // namespace NKqp } // namespace NKikimr diff --git a/ydb/core/kqp/session_actor/kqp_tx.h b/ydb/core/kqp/session_actor/kqp_tx.h index ff2237bf97..171d6030b9 100644 --- a/ydb/core/kqp/session_actor/kqp_tx.h +++ b/ydb/core/kqp/session_actor/kqp_tx.h @@ -440,6 +440,9 @@ std::pair<bool, std::vector<NYql::TIssue>> MergeLocks(const NKikimrMiniKQL::TTyp bool NeedSnapshot(const TKqpTransactionContext& txCtx, const NYql::TKikimrConfiguration& config, bool rollbackTx, bool commitTx, const NKqpProto::TKqpPhyQuery& physicalQuery); -bool HasOlapTableInTx(const NKqpProto::TKqpPhyQuery& physicalQuery); +bool HasOlapTableReadInTx(const NKqpProto::TKqpPhyQuery& physicalQuery); +bool HasOlapTableWriteInTx(const NKqpProto::TKqpPhyQuery& physicalQuery); +bool HasOltpTableReadInTx(const NKqpProto::TKqpPhyQuery& physicalQuery); +bool HasOltpTableWriteInTx(const NKqpProto::TKqpPhyQuery& physicalQuery); } // namespace NKikimr::NKqp diff --git a/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp b/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp index d8d10c18fb..d9f49cd4b0 100644 --- a/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp +++ b/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp @@ -5560,6 +5560,15 @@ Y_UNIT_TEST_SUITE(KqpOlap) { auto session = kikimr.GetTableClient().CreateSession().GetValueSync().GetSession(); const TString query = R"( + CREATE TABLE `/Root/ColumnSource` ( + Col1 Uint64 NOT NULL, + Col2 String, + Col3 Int32 NOT NULL, + PRIMARY KEY (Col1) + ) + PARTITION BY HASH(Col1) + WITH (STORE = COLUMN, AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 10); + CREATE TABLE `/Root/DataShard1` ( Col1 Uint64 NOT NULL, Col2 String, @@ -5586,6 +5595,15 @@ Y_UNIT_TEST_SUITE(KqpOlap) { PARTITION BY HASH(Col1) WITH (STORE = COLUMN, AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 16); + CREATE TABLE `/Root/ColumnShard3` ( + Col1 Uint64 NOT NULL, + Col2 String, + Col3 Int32 NOT NULL, + PRIMARY KEY (Col1) + ) + PARTITION BY HASH(Col1) + WITH (STORE = COLUMN, AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 16); + CREATE TABLE `/Root/DataShard2` ( Col1 Uint64 NOT NULL, Col2 String, @@ -5599,34 +5617,19 @@ Y_UNIT_TEST_SUITE(KqpOlap) { UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); auto client = kikimr.GetQueryClient(); - auto prepareResult = client.ExecuteQuery(R"( - REPLACE INTO `/Root/DataShard1` (Col1, Col2, Col3) VALUES - (1u, "test1", 10), (2u, "test2", 11), (3u, "test3", 12), (4u, NULL, 13); - )", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync(); - UNIT_ASSERT_C(prepareResult.IsSuccess(), prepareResult.GetIssues().ToString()); - { - // row -> column - const TString sql = R"( - REPLACE INTO `/Root/ColumnShard1` - SELECT * FROM `/Root/DataShard1` - )"; - auto insertResult = client.ExecuteQuery(sql, NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync(); - UNIT_ASSERT_C(insertResult.IsSuccess(), insertResult.GetIssues().ToString()); - - auto it = client.StreamExecuteQuery(R"( - SELECT * FROM `/Root/ColumnShard1` ORDER BY Col1, Col2, Col3; + auto prepareResult = client.ExecuteQuery(R"( + REPLACE INTO `/Root/ColumnSource` (Col1, Col2, Col3) VALUES + (1u, "test1", 10), (2u, "test2", 11), (3u, "test3", 12), (4u, NULL, 13); )", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync(); - UNIT_ASSERT_VALUES_EQUAL_C(it.GetStatus(), EStatus::SUCCESS, it.GetIssues().ToString()); - TString output = StreamResultToYson(it); - CompareYson(output, R"([[1u;["test1"];10];[2u;["test2"];11];[3u;["test3"];12];[4u;#;13]])"); + UNIT_ASSERT_C(prepareResult.IsSuccess(), prepareResult.GetIssues().ToString()); } { // Missing Nullable column const TString sql = R"( REPLACE INTO `/Root/ColumnShard1` - SELECT 10u + Col1 AS Col1, 100 + Col3 AS Col3 FROM `/Root/DataShard1` + SELECT 10u + Col1 AS Col1, 100 + Col3 AS Col3 FROM `/Root/ColumnSource` )"; auto insertResult = client.ExecuteQuery(sql, NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync(); UNIT_ASSERT_C(insertResult.IsSuccess(), insertResult.GetIssues().ToString()); @@ -5638,14 +5641,14 @@ Y_UNIT_TEST_SUITE(KqpOlap) { TString output = StreamResultToYson(it); CompareYson( output, - R"([[1u;["test1"];10];[2u;["test2"];11];[3u;["test3"];12];[4u;#;13];[11u;#;110];[12u;#;111];[13u;#;112];[14u;#;113]])"); + R"([[11u;#;110];[12u;#;111];[13u;#;112];[14u;#;113]])"); } { // column -> column const TString sql = R"( REPLACE INTO `/Root/ColumnShard2` - SELECT * FROM `/Root/ColumnShard1` + SELECT * FROM `/Root/ColumnSource` )"; auto insertResult = client.ExecuteQuery(sql, NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync(); UNIT_ASSERT_C(insertResult.IsSuccess(), insertResult.GetIssues().ToString()); @@ -5657,26 +5660,39 @@ Y_UNIT_TEST_SUITE(KqpOlap) { TString output = StreamResultToYson(it); CompareYson( output, - R"([[1u;["test1"];10];[2u;["test2"];11];[3u;["test3"];12];[4u;#;13];[11u;#;110];[12u;#;111];[13u;#;112];[14u;#;113]])"); + R"([[1u;["test1"];10];[2u;["test2"];11];[3u;["test3"];12];[4u;#;13]])"); + } + + { + auto prepareResult = client.ExecuteQuery(R"( + REPLACE INTO `/Root/DataShard1` (Col1, Col2, Col3) VALUES + (1u, "test1", 10), (2u, "test2", 11), (3u, "test3", 12), (4u, NULL, 13); + )", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_C(prepareResult.IsSuccess(), prepareResult.GetIssues().ToString()); + + // row -> column + const TString sql = R"( + REPLACE INTO `/Root/ColumnShard3` + SELECT * FROM `/Root/DataShard1` + )"; + auto insertResult = client.ExecuteQuery(sql, NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync(); + UNIT_ASSERT(!insertResult.IsSuccess()); + UNIT_ASSERT_C( + insertResult.GetIssues().ToString().Contains("Transactions between column and row tables are disabled at current time"), + insertResult.GetIssues().ToString()); } { // column -> row const TString sql = R"( REPLACE INTO `/Root/DataShard2` - SELECT * FROM `/Root/ColumnShard2` + SELECT * FROM `/Root/ColumnSource` )"; auto insertResult = client.ExecuteQuery(sql, NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync(); - UNIT_ASSERT_C(insertResult.IsSuccess(), insertResult.GetIssues().ToString()); - - auto it = client.StreamExecuteQuery(R"( - SELECT * FROM `/Root/DataShard2` ORDER BY Col1, Col2, Col3; - )", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync(); - UNIT_ASSERT_VALUES_EQUAL_C(it.GetStatus(), EStatus::SUCCESS, it.GetIssues().ToString()); - TString output = StreamResultToYson(it); - CompareYson( - output, - R"([[1u;["test1"];10];[2u;["test2"];11];[3u;["test3"];12];[4u;#;13];[11u;#;110];[12u;#;111];[13u;#;112];[14u;#;113]])"); + UNIT_ASSERT(!insertResult.IsSuccess()); + UNIT_ASSERT_C( + insertResult.GetIssues().ToString().Contains("Transactions between column and row tables are disabled at current time"), + insertResult.GetIssues().ToString()); } } |