diff options
author | Nikita Vasilev <ns-vasilev@ydb.tech> | 2024-07-18 11:22:56 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-07-18 11:22:56 +0300 |
commit | c2436ed8947a1281389f3f065614c63cda5acb8d (patch) | |
tree | e5c85fb7ed759d1809af6a462c22204bb5427be0 | |
parent | 5ff39baa13338aa0310607be94962a86af5d7953 (diff) | |
download | ydb-c2436ed8947a1281389f3f065614c63cda5acb8d.tar.gz |
ColumnShard + DataShard reads (#6800)
-rw-r--r-- | ydb/core/kqp/common/kqp_tx.cpp | 7 | ||||
-rw-r--r-- | ydb/core/kqp/session_actor/kqp_query_state.h | 4 | ||||
-rw-r--r-- | ydb/core/kqp/session_actor/kqp_session_actor.cpp | 6 | ||||
-rw-r--r-- | ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp | 113 |
4 files changed, 106 insertions, 24 deletions
diff --git a/ydb/core/kqp/common/kqp_tx.cpp b/ydb/core/kqp/common/kqp_tx.cpp index 501ee6476b..c51be61a48 100644 --- a/ydb/core/kqp/common/kqp_tx.cpp +++ b/ydb/core/kqp/common/kqp_tx.cpp @@ -166,6 +166,13 @@ bool NeedSnapshot(const TKqpTransactionContext& txCtx, const NYql::TKikimrConfig for (const auto &input : stage.GetInputs()) { hasStreamLookup |= input.GetTypeCase() == NKqpProto::TKqpPhyConnection::kStreamLookup; } + + for (const auto &tableOp : stage.GetTableOps()) { + if (tableOp.GetTypeCase() == NKqpProto::TKqpPhyTableOperation::kReadOlapRange) { + // always need snapshot for OLAP reads + return true; + } + } } } diff --git a/ydb/core/kqp/session_actor/kqp_query_state.h b/ydb/core/kqp/session_actor/kqp_query_state.h index 85374f257d..8a5b1c146d 100644 --- a/ydb/core/kqp/session_actor/kqp_query_state.h +++ b/ydb/core/kqp/session_actor/kqp_query_state.h @@ -312,10 +312,6 @@ public: bool NeedPersistentSnapshot() const { auto type = GetType(); - if (type == NKikimrKqp::QUERY_TYPE_SQL_GENERIC_CONCURRENT_QUERY || - type == NKikimrKqp::QUERY_TYPE_SQL_GENERIC_QUERY) { - return ::NKikimr::NKqp::HasOlapTableReadInTx(PreparedQuery->GetPhysicalQuery()); - } return ( type == NKikimrKqp::QUERY_TYPE_SQL_SCAN || type == NKikimrKqp::QUERY_TYPE_AST_SCAN diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp index c25dd11a53..63db346513 100644 --- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp +++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp @@ -837,9 +837,10 @@ public: const NKqpProto::TKqpPhyQuery& phyQuery = QueryState->PreparedQuery->GetPhysicalQuery(); HasOlapTable |= ::NKikimr::NKqp::HasOlapTableReadInTx(phyQuery) || ::NKikimr::NKqp::HasOlapTableWriteInTx(phyQuery); HasOltpTable |= ::NKikimr::NKqp::HasOltpTableReadInTx(phyQuery) || ::NKikimr::NKqp::HasOltpTableWriteInTx(phyQuery); - if (HasOlapTable && HasOltpTable) { + HasTableWrite |= ::NKikimr::NKqp::HasOlapTableWriteInTx(phyQuery) || ::NKikimr::NKqp::HasOltpTableWriteInTx(phyQuery); + if (HasOlapTable && HasOltpTable && HasTableWrite) { ReplyQueryError(Ydb::StatusIds::PRECONDITION_FAILED, - "Transactions between column and row tables are disabled at current time."); + "Write transactions between column and row tables are disabled at current time."); return false; } QueryState->TxCtx->SetTempTables(QueryState->TempTablesState); @@ -2536,6 +2537,7 @@ private: bool HasOlapTable = false; bool HasOltpTable = false; + bool HasTableWrite = false; TGUCSettings::TPtr GUCSettings; }; diff --git a/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp b/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp index 3ff4e0efca..9422053522 100644 --- a/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp +++ b/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp @@ -2790,7 +2790,7 @@ Y_UNIT_TEST_SUITE(KqpQueryService) { auto insertResult = client.ExecuteQuery(sql, NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync(); UNIT_ASSERT(!insertResult.IsSuccess()); UNIT_ASSERT_C( - insertResult.GetIssues().ToString().Contains("Transactions between column and row tables are disabled at current time"), + insertResult.GetIssues().ToString().Contains("Write transactions between column and row tables are disabled at current time"), insertResult.GetIssues().ToString()); } @@ -2803,20 +2803,7 @@ Y_UNIT_TEST_SUITE(KqpQueryService) { auto insertResult = client.ExecuteQuery(sql, NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync(); UNIT_ASSERT(!insertResult.IsSuccess()); UNIT_ASSERT_C( - insertResult.GetIssues().ToString().Contains("Transactions between column and row tables are disabled at current time"), - insertResult.GetIssues().ToString()); - } - - { - // column & row read - const TString sql = R"( - SELECT * FROM `/Root/DataShard`; - SELECT * FROM `/Root/ColumnShard`; - )"; - auto insertResult = client.ExecuteQuery(sql, NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync(); - UNIT_ASSERT(!insertResult.IsSuccess()); - UNIT_ASSERT_C( - insertResult.GetIssues().ToString().Contains("Transactions between column and row tables are disabled at current time"), + insertResult.GetIssues().ToString().Contains("Write transactions between column and row tables are disabled at current time"), insertResult.GetIssues().ToString()); } @@ -2831,7 +2818,7 @@ Y_UNIT_TEST_SUITE(KqpQueryService) { auto insertResult = client.ExecuteQuery(sql, NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync(); UNIT_ASSERT(!insertResult.IsSuccess()); UNIT_ASSERT_C( - insertResult.GetIssues().ToString().Contains("Transactions between column and row tables are disabled at current time"), + insertResult.GetIssues().ToString().Contains("Write transactions between column and row tables are disabled at current time"), insertResult.GetIssues().ToString()); } @@ -2845,7 +2832,7 @@ Y_UNIT_TEST_SUITE(KqpQueryService) { auto insertResult = client.ExecuteQuery(sql, NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync(); UNIT_ASSERT(!insertResult.IsSuccess()); UNIT_ASSERT_C( - insertResult.GetIssues().ToString().Contains("Transactions between column and row tables are disabled at current time"), + insertResult.GetIssues().ToString().Contains("Write transactions between column and row tables are disabled at current time"), insertResult.GetIssues().ToString()); } @@ -2859,7 +2846,7 @@ Y_UNIT_TEST_SUITE(KqpQueryService) { auto insertResult = client.ExecuteQuery(sql, NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync(); UNIT_ASSERT(!insertResult.IsSuccess()); UNIT_ASSERT_C( - insertResult.GetIssues().ToString().Contains("Transactions between column and row tables are disabled at current time"), + insertResult.GetIssues().ToString().Contains("Write transactions between column and row tables are disabled at current time"), insertResult.GetIssues().ToString()); } } @@ -3533,6 +3520,96 @@ Y_UNIT_TEST_SUITE(KqpQueryService) { } } + Y_UNIT_TEST(ReadDatashardAndColumnshard) { + NKikimrConfig::TAppConfig appConfig; + appConfig.MutableTableServiceConfig()->SetEnableOlapSink(true); + appConfig.MutableTableServiceConfig()->SetEnableOltpSink(true); + auto settings = TKikimrSettings() + .SetAppConfig(appConfig) + .SetWithSampleTables(false); + + TKikimrRunner kikimr(settings); + Tests::NCommon::TLoggerInit(kikimr).Initialize(); + + auto client = kikimr.GetQueryClient(); + + { + auto createTable = client.ExecuteQuery(R"sql( + CREATE TABLE `/Root/DataShard` ( + Col1 Uint64 NOT NULL, + Col2 Int32, + Col3 String, + PRIMARY KEY (Col1) + ) WITH ( + STORE = ROW, + AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 10 + ); + CREATE TABLE `/Root/ColumnShard` ( + Col1 Uint64 NOT NULL, + Col2 Int32, + Col3 String, + PRIMARY KEY (Col1) + ) WITH ( + STORE = COLUMN, + AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 10 + ); + )sql", NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync(); + UNIT_ASSERT_C(createTable.IsSuccess(), createTable.GetIssues().ToString()); + } + + { + auto replaceValues = client.ExecuteQuery(R"sql( + REPLACE INTO `/Root/DataShard` (Col1, Col2, Col3) VALUES + (1u, 1, "row"); + )sql", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_C(replaceValues.IsSuccess(), replaceValues.GetIssues().ToString()); + } + + { + auto replaceValues = client.ExecuteQuery(R"sql( + REPLACE INTO `/Root/ColumnShard` (Col1, Col2, Col3) VALUES + (2u, 2, "column"); + )sql", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_C(replaceValues.IsSuccess(), replaceValues.GetIssues().ToString()); + } + + { + auto it = client.StreamExecuteQuery(R"sql( + SELECT * FROM `/Root/ColumnShard`; + )sql", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(it.GetStatus(), EStatus::SUCCESS, it.GetIssues().ToString()); + TString output = StreamResultToYson(it); + CompareYson( + output, + R"([[2u;[2];["column"]]])"); + } + + { + auto it = client.StreamExecuteQuery(R"sql( + SELECT * FROM `/Root/DataShard` + UNION ALL + SELECT * FROM `/Root/ColumnShard`; + )sql", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(it.GetStatus(), EStatus::SUCCESS, it.GetIssues().ToString()); + TString output = StreamResultToYson(it); + CompareYson( + output, + R"([[1u;[1];["row"]];[2u;[2];["column"]]])"); + } + + { + auto it = client.StreamExecuteQuery(R"sql( + SELECT r.Col3, c.Col3 FROM `/Root/DataShard` AS r + JOIN `/Root/ColumnShard` AS c ON r.Col1 + 1 = c.Col1; + )sql", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(it.GetStatus(), EStatus::SUCCESS, it.GetIssues().ToString()); + TString output = StreamResultToYson(it); + CompareYson( + output, + R"([[["row"];["column"]]])"); + } + } + Y_UNIT_TEST(ReplaceIntoWithDefaultValue) { NKikimrConfig::TAppConfig appConfig; appConfig.MutableTableServiceConfig()->SetEnableOlapSink(false); |