aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorNikita Vasilev <ns-vasilev@ydb.tech>2024-07-18 11:22:56 +0300
committerGitHub <noreply@github.com>2024-07-18 11:22:56 +0300
commitc2436ed8947a1281389f3f065614c63cda5acb8d (patch)
treee5c85fb7ed759d1809af6a462c22204bb5427be0
parent5ff39baa13338aa0310607be94962a86af5d7953 (diff)
downloadydb-c2436ed8947a1281389f3f065614c63cda5acb8d.tar.gz
ColumnShard + DataShard reads (#6800)
-rw-r--r--ydb/core/kqp/common/kqp_tx.cpp7
-rw-r--r--ydb/core/kqp/session_actor/kqp_query_state.h4
-rw-r--r--ydb/core/kqp/session_actor/kqp_session_actor.cpp6
-rw-r--r--ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp113
4 files changed, 106 insertions, 24 deletions
diff --git a/ydb/core/kqp/common/kqp_tx.cpp b/ydb/core/kqp/common/kqp_tx.cpp
index 501ee6476b..c51be61a48 100644
--- a/ydb/core/kqp/common/kqp_tx.cpp
+++ b/ydb/core/kqp/common/kqp_tx.cpp
@@ -166,6 +166,13 @@ bool NeedSnapshot(const TKqpTransactionContext& txCtx, const NYql::TKikimrConfig
for (const auto &input : stage.GetInputs()) {
hasStreamLookup |= input.GetTypeCase() == NKqpProto::TKqpPhyConnection::kStreamLookup;
}
+
+ for (const auto &tableOp : stage.GetTableOps()) {
+ if (tableOp.GetTypeCase() == NKqpProto::TKqpPhyTableOperation::kReadOlapRange) {
+ // always need snapshot for OLAP reads
+ return true;
+ }
+ }
}
}
diff --git a/ydb/core/kqp/session_actor/kqp_query_state.h b/ydb/core/kqp/session_actor/kqp_query_state.h
index 85374f257d..8a5b1c146d 100644
--- a/ydb/core/kqp/session_actor/kqp_query_state.h
+++ b/ydb/core/kqp/session_actor/kqp_query_state.h
@@ -312,10 +312,6 @@ public:
bool NeedPersistentSnapshot() const {
auto type = GetType();
- if (type == NKikimrKqp::QUERY_TYPE_SQL_GENERIC_CONCURRENT_QUERY ||
- type == NKikimrKqp::QUERY_TYPE_SQL_GENERIC_QUERY) {
- return ::NKikimr::NKqp::HasOlapTableReadInTx(PreparedQuery->GetPhysicalQuery());
- }
return (
type == NKikimrKqp::QUERY_TYPE_SQL_SCAN ||
type == NKikimrKqp::QUERY_TYPE_AST_SCAN
diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp
index c25dd11a53..63db346513 100644
--- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp
+++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp
@@ -837,9 +837,10 @@ public:
const NKqpProto::TKqpPhyQuery& phyQuery = QueryState->PreparedQuery->GetPhysicalQuery();
HasOlapTable |= ::NKikimr::NKqp::HasOlapTableReadInTx(phyQuery) || ::NKikimr::NKqp::HasOlapTableWriteInTx(phyQuery);
HasOltpTable |= ::NKikimr::NKqp::HasOltpTableReadInTx(phyQuery) || ::NKikimr::NKqp::HasOltpTableWriteInTx(phyQuery);
- if (HasOlapTable && HasOltpTable) {
+ HasTableWrite |= ::NKikimr::NKqp::HasOlapTableWriteInTx(phyQuery) || ::NKikimr::NKqp::HasOltpTableWriteInTx(phyQuery);
+ if (HasOlapTable && HasOltpTable && HasTableWrite) {
ReplyQueryError(Ydb::StatusIds::PRECONDITION_FAILED,
- "Transactions between column and row tables are disabled at current time.");
+ "Write transactions between column and row tables are disabled at current time.");
return false;
}
QueryState->TxCtx->SetTempTables(QueryState->TempTablesState);
@@ -2536,6 +2537,7 @@ private:
bool HasOlapTable = false;
bool HasOltpTable = false;
+ bool HasTableWrite = false;
TGUCSettings::TPtr GUCSettings;
};
diff --git a/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp b/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp
index 3ff4e0efca..9422053522 100644
--- a/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp
+++ b/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp
@@ -2790,7 +2790,7 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
auto insertResult = client.ExecuteQuery(sql, NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync();
UNIT_ASSERT(!insertResult.IsSuccess());
UNIT_ASSERT_C(
- insertResult.GetIssues().ToString().Contains("Transactions between column and row tables are disabled at current time"),
+ insertResult.GetIssues().ToString().Contains("Write transactions between column and row tables are disabled at current time"),
insertResult.GetIssues().ToString());
}
@@ -2803,20 +2803,7 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
auto insertResult = client.ExecuteQuery(sql, NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync();
UNIT_ASSERT(!insertResult.IsSuccess());
UNIT_ASSERT_C(
- insertResult.GetIssues().ToString().Contains("Transactions between column and row tables are disabled at current time"),
- insertResult.GetIssues().ToString());
- }
-
- {
- // column & row read
- const TString sql = R"(
- SELECT * FROM `/Root/DataShard`;
- SELECT * FROM `/Root/ColumnShard`;
- )";
- auto insertResult = client.ExecuteQuery(sql, NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync();
- UNIT_ASSERT(!insertResult.IsSuccess());
- UNIT_ASSERT_C(
- insertResult.GetIssues().ToString().Contains("Transactions between column and row tables are disabled at current time"),
+ insertResult.GetIssues().ToString().Contains("Write transactions between column and row tables are disabled at current time"),
insertResult.GetIssues().ToString());
}
@@ -2831,7 +2818,7 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
auto insertResult = client.ExecuteQuery(sql, NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync();
UNIT_ASSERT(!insertResult.IsSuccess());
UNIT_ASSERT_C(
- insertResult.GetIssues().ToString().Contains("Transactions between column and row tables are disabled at current time"),
+ insertResult.GetIssues().ToString().Contains("Write transactions between column and row tables are disabled at current time"),
insertResult.GetIssues().ToString());
}
@@ -2845,7 +2832,7 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
auto insertResult = client.ExecuteQuery(sql, NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync();
UNIT_ASSERT(!insertResult.IsSuccess());
UNIT_ASSERT_C(
- insertResult.GetIssues().ToString().Contains("Transactions between column and row tables are disabled at current time"),
+ insertResult.GetIssues().ToString().Contains("Write transactions between column and row tables are disabled at current time"),
insertResult.GetIssues().ToString());
}
@@ -2859,7 +2846,7 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
auto insertResult = client.ExecuteQuery(sql, NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync();
UNIT_ASSERT(!insertResult.IsSuccess());
UNIT_ASSERT_C(
- insertResult.GetIssues().ToString().Contains("Transactions between column and row tables are disabled at current time"),
+ insertResult.GetIssues().ToString().Contains("Write transactions between column and row tables are disabled at current time"),
insertResult.GetIssues().ToString());
}
}
@@ -3533,6 +3520,96 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
}
}
+ Y_UNIT_TEST(ReadDatashardAndColumnshard) {
+ NKikimrConfig::TAppConfig appConfig;
+ appConfig.MutableTableServiceConfig()->SetEnableOlapSink(true);
+ appConfig.MutableTableServiceConfig()->SetEnableOltpSink(true);
+ auto settings = TKikimrSettings()
+ .SetAppConfig(appConfig)
+ .SetWithSampleTables(false);
+
+ TKikimrRunner kikimr(settings);
+ Tests::NCommon::TLoggerInit(kikimr).Initialize();
+
+ auto client = kikimr.GetQueryClient();
+
+ {
+ auto createTable = client.ExecuteQuery(R"sql(
+ CREATE TABLE `/Root/DataShard` (
+ Col1 Uint64 NOT NULL,
+ Col2 Int32,
+ Col3 String,
+ PRIMARY KEY (Col1)
+ ) WITH (
+ STORE = ROW,
+ AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 10
+ );
+ CREATE TABLE `/Root/ColumnShard` (
+ Col1 Uint64 NOT NULL,
+ Col2 Int32,
+ Col3 String,
+ PRIMARY KEY (Col1)
+ ) WITH (
+ STORE = COLUMN,
+ AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 10
+ );
+ )sql", NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync();
+ UNIT_ASSERT_C(createTable.IsSuccess(), createTable.GetIssues().ToString());
+ }
+
+ {
+ auto replaceValues = client.ExecuteQuery(R"sql(
+ REPLACE INTO `/Root/DataShard` (Col1, Col2, Col3) VALUES
+ (1u, 1, "row");
+ )sql", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
+ UNIT_ASSERT_C(replaceValues.IsSuccess(), replaceValues.GetIssues().ToString());
+ }
+
+ {
+ auto replaceValues = client.ExecuteQuery(R"sql(
+ REPLACE INTO `/Root/ColumnShard` (Col1, Col2, Col3) VALUES
+ (2u, 2, "column");
+ )sql", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
+ UNIT_ASSERT_C(replaceValues.IsSuccess(), replaceValues.GetIssues().ToString());
+ }
+
+ {
+ auto it = client.StreamExecuteQuery(R"sql(
+ SELECT * FROM `/Root/ColumnShard`;
+ )sql", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(it.GetStatus(), EStatus::SUCCESS, it.GetIssues().ToString());
+ TString output = StreamResultToYson(it);
+ CompareYson(
+ output,
+ R"([[2u;[2];["column"]]])");
+ }
+
+ {
+ auto it = client.StreamExecuteQuery(R"sql(
+ SELECT * FROM `/Root/DataShard`
+ UNION ALL
+ SELECT * FROM `/Root/ColumnShard`;
+ )sql", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(it.GetStatus(), EStatus::SUCCESS, it.GetIssues().ToString());
+ TString output = StreamResultToYson(it);
+ CompareYson(
+ output,
+ R"([[1u;[1];["row"]];[2u;[2];["column"]]])");
+ }
+
+ {
+ auto it = client.StreamExecuteQuery(R"sql(
+ SELECT r.Col3, c.Col3 FROM `/Root/DataShard` AS r
+ JOIN `/Root/ColumnShard` AS c ON r.Col1 + 1 = c.Col1;
+ )sql", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(it.GetStatus(), EStatus::SUCCESS, it.GetIssues().ToString());
+ TString output = StreamResultToYson(it);
+ CompareYson(
+ output,
+ R"([[["row"];["column"]]])");
+ }
+ }
+
Y_UNIT_TEST(ReplaceIntoWithDefaultValue) {
NKikimrConfig::TAppConfig appConfig;
appConfig.MutableTableServiceConfig()->SetEnableOlapSink(false);