aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorNikita Vasilev <ns-vasilev@ydb.tech>2025-04-22 20:00:40 +0300
committerGitHub <noreply@github.com>2025-04-22 20:00:40 +0300
commitc9d786654e44f473624bd81ee017dea6559e80b1 (patch)
treee9602024387c30c6876dab78b96ccc6cc8be9269
parent7a3575ff900bc1ec9529d4b8deea55ffab1335a5 (diff)
downloadydb-c9d786654e44f473624bd81ee017dea6559e80b1.tar.gz
Tests for named expressions with sinks (#17540)
-rw-r--r--ydb/core/kqp/ut/opt/kqp_named_expressions_ut.cpp404
1 files changed, 404 insertions, 0 deletions
diff --git a/ydb/core/kqp/ut/opt/kqp_named_expressions_ut.cpp b/ydb/core/kqp/ut/opt/kqp_named_expressions_ut.cpp
index 3eac0573cfc..63f86347b30 100644
--- a/ydb/core/kqp/ut/opt/kqp_named_expressions_ut.cpp
+++ b/ydb/core/kqp/ut/opt/kqp_named_expressions_ut.cpp
@@ -461,6 +461,410 @@ Y_UNIT_TEST_SUITE(KqpNamedExpressions) {
}
}
}
+
+ Y_UNIT_TEST_QUAD(NamedExpressionRandomUpsertIndex, UseSink, UseDataQuery) {
+ NKikimrConfig::TAppConfig appConfig;
+ appConfig.MutableTableServiceConfig()->SetEnableOltpSink(UseSink);
+ auto settings = TKikimrSettings()
+ .SetAppConfig(appConfig)
+ .SetWithSampleTables(true);
+
+ const std::vector<std::pair<std::string, std::string>> tests = {
+ {"", ""},
+ {"INDEX i GLOBAL ON (Key2),", ""},
+ {"", "INDEX i GLOBAL ON (Key2),"},
+ {"INDEX i GLOBAL ON (Key2),", "INDEX i GLOBAL ON (Key2),"},
+ };
+
+ for (const auto& [index1, index2] : tests) {
+ TKikimrRunner kikimr(settings);
+ {
+ const TString query = std::format(R"(
+ CREATE TABLE Source (
+ Key String,
+ Key2 String,
+ Value String,
+ PRIMARY KEY (Key)
+ );
+
+ CREATE TABLE Dest1 (
+ Key String,
+ Key2 String,
+ Value String,
+ {0}
+ PRIMARY KEY (Key)
+ );
+
+ CREATE TABLE Dest2 (
+ Key String,
+ Key2 String,
+ Value String,
+ {1}
+ PRIMARY KEY (Key)
+ );
+ )", index1, index2);
+
+ auto session = kikimr.GetTableClient().CreateSession().GetValueSync().GetSession();
+ auto result = session.ExecuteSchemeQuery(query).ExtractValueSync();
+ UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
+ }
+
+ {
+ const TString query = R"(
+ INSERT INTO Source (Key, Key2, Value) VALUES
+ ("1", "test", "");
+ )";
+
+ auto session = kikimr.GetTableClient().CreateSession().GetValueSync().GetSession();
+ auto result = session.ExecuteDataQuery(query, NYdb::NTable::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
+ UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
+ }
+
+ {
+ const TString query = R"(
+ $t = (
+ SELECT
+ Key AS Key,
+ Key AS Key2,
+ CAST(RandomUuid(Key) AS String) As Value
+ FROM Source
+ );
+
+ UPSERT INTO Dest1 (
+ SELECT
+ Key AS Key,
+ CAST(RandomUuid(Key) AS String) AS Key2,
+ Value AS Value
+ From $t
+ );
+
+ UPSERT INTO Dest2 (
+ SELECT
+ Key AS Key,
+ CAST(RandomUuid(Key) AS String) AS Key2,
+ Value AS Value
+ From $t
+ );
+ )";
+
+ if (UseDataQuery) {
+ auto session = kikimr.GetTableClient().CreateSession().GetValueSync().GetSession();
+ auto result = session.ExecuteDataQuery(query, NYdb::NTable::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
+ UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
+ } else {
+ auto result = kikimr.GetQueryClient().ExecuteQuery(query, NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
+ UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
+ }
+ }
+ {
+ const TString query = R"(
+ SELECT Value FROM Dest1 ORDER BY Value;
+ SELECT Value FROM Dest2 ORDER BY Value;
+ )";
+
+ auto session = kikimr.GetTableClient().CreateSession().GetValueSync().GetSession();
+ auto result = session.ExecuteDataQuery(query, NYdb::NTable::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
+ UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
+
+ Cerr << FormatResultSetYson(result.GetResultSet(0)) << Endl;
+ Cerr << FormatResultSetYson(result.GetResultSet(1)) << Endl;
+
+ const bool onlyOneIndex = (index1.empty() != index2.empty());
+
+ if (onlyOneIndex && UseSink) {
+ // "with index" uses literal executer (precompute), while "without index" uses compute actor.
+ // TODO:
+ UNIT_ASSERT(FormatResultSetYson(result.GetResultSet(0)) != FormatResultSetYson(result.GetResultSet(1)));
+ } else {
+ UNIT_ASSERT_VALUES_EQUAL(FormatResultSetYson(result.GetResultSet(0)), FormatResultSetYson(result.GetResultSet(1)));
+ }
+ }
+ }
+ }
+
+ Y_UNIT_TEST_QUAD(NamedExpressionRandomUpsertReturning, UseSink, UseDataQuery) {
+ NKikimrConfig::TAppConfig appConfig;
+ appConfig.MutableTableServiceConfig()->SetEnableOltpSink(UseSink);
+ auto settings = TKikimrSettings()
+ .SetAppConfig(appConfig)
+ .SetWithSampleTables(true);
+
+ const std::vector<std::pair<std::string, std::string>> tests = {
+ {"", ""},
+ {"RETURNING Value", ""},
+ {"", "RETURNING Value"},
+ {"RETURNING Value", "RETURNING Value"},
+ };
+
+ for (const auto& [ret1, ret2] : tests) {
+ TKikimrRunner kikimr(settings);
+ {
+ const TString query = R"(
+ CREATE TABLE Source (
+ Key String,
+ Key2 String,
+ Value String,
+ PRIMARY KEY (Key)
+ );
+
+ CREATE TABLE Dest1 (
+ Key String,
+ Key2 String,
+ Value String,
+ PRIMARY KEY (Key)
+ );
+
+ CREATE TABLE Dest2 (
+ Key String,
+ Key2 String,
+ Value String,
+ PRIMARY KEY (Key)
+ );
+ )";
+
+ auto session = kikimr.GetTableClient().CreateSession().GetValueSync().GetSession();
+ auto result = session.ExecuteSchemeQuery(query).ExtractValueSync();
+ UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
+ }
+
+ {
+ const TString query = R"(
+ INSERT INTO Source (Key, Key2, Value) VALUES
+ ("1", "test", "");
+ )";
+
+ auto session = kikimr.GetTableClient().CreateSession().GetValueSync().GetSession();
+ auto result = session.ExecuteDataQuery(query, NYdb::NTable::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
+ UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
+ }
+
+ {
+ const TString query = std::format(R"(
+ $t = (
+ SELECT
+ Key AS Key,
+ Key AS Key2,
+ CAST(RandomUuid(Key) AS String) As Value
+ FROM Source
+ );
+
+ UPSERT INTO Dest1 (
+ SELECT
+ Key AS Key,
+ CAST(RandomUuid(Key) AS String) AS Key2,
+ Value AS Value
+ From $t
+ )
+ {0};
+
+ UPSERT INTO Dest2 (
+ SELECT
+ Key AS Key,
+ CAST(RandomUuid(Key) AS String) AS Key2,
+ Value AS Value
+ From $t
+ )
+ {1};
+ )", ret1, ret2);
+
+ if (UseDataQuery) {
+ auto session = kikimr.GetTableClient().CreateSession().GetValueSync().GetSession();
+ auto result = session.ExecuteDataQuery(query, NYdb::NTable::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
+ UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
+ } else {
+ auto result = kikimr.GetQueryClient().ExecuteQuery(query, NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
+ UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
+ }
+ }
+ {
+ const TString query = R"(
+ SELECT Value FROM Dest1 ORDER BY Value;
+ SELECT Value FROM Dest2 ORDER BY Value;
+ )";
+
+ auto session = kikimr.GetTableClient().CreateSession().GetValueSync().GetSession();
+ auto result = session.ExecuteDataQuery(query, NYdb::NTable::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
+ UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
+
+ Cerr << FormatResultSetYson(result.GetResultSet(0)) << Endl;
+ Cerr << FormatResultSetYson(result.GetResultSet(1)) << Endl;
+
+ UNIT_ASSERT_VALUES_EQUAL(FormatResultSetYson(result.GetResultSet(0)), FormatResultSetYson(result.GetResultSet(1)));
+ }
+ }
+ }
+
+ Y_UNIT_TEST_QUAD(NamedExpressionRandomUpsertRevert, UseSink, UseDataQuery) {
+ NKikimrConfig::TAppConfig appConfig;
+ appConfig.MutableTableServiceConfig()->SetEnableOltpSink(UseSink);
+ auto settings = TKikimrSettings()
+ .SetAppConfig(appConfig)
+ .SetWithSampleTables(true);
+
+ const std::vector<std::string> ops = {"UPSERT", "INSERT", "INSERT OR REVERT"};
+
+ for (const auto& op1 : ops) {
+ for (const auto& op2 : ops) {
+ TKikimrRunner kikimr(settings);
+ {
+ const TString query = R"(
+ CREATE TABLE Source (
+ Key String,
+ Key2 String,
+ Value String,
+ PRIMARY KEY (Key)
+ );
+
+ CREATE TABLE Dest1 (
+ Key String,
+ Key2 String,
+ Value String,
+ PRIMARY KEY (Key)
+ );
+
+ CREATE TABLE Dest2 (
+ Key String,
+ Key2 String,
+ Value String,
+ PRIMARY KEY (Key)
+ );
+ )";
+
+ auto session = kikimr.GetTableClient().CreateSession().GetValueSync().GetSession();
+ auto result = session.ExecuteSchemeQuery(query).ExtractValueSync();
+ UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
+ }
+
+ {
+ const TString query = R"(
+ INSERT INTO Source (Key, Key2, Value) VALUES
+ ("1", "test", "");
+ )";
+
+ auto session = kikimr.GetTableClient().CreateSession().GetValueSync().GetSession();
+ auto result = session.ExecuteDataQuery(query, NYdb::NTable::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
+ UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
+ }
+
+ {
+ const TString query = std::format(R"(
+ $t = (
+ SELECT
+ Key AS Key,
+ Key AS Key2,
+ CAST(RandomUuid(Key) AS String) As Value
+ FROM Source
+ );
+
+ {0} INTO Dest1 (
+ SELECT
+ Key AS Key,
+ CAST(RandomUuid(Key) AS String) AS Key2,
+ Value AS Value
+ From $t
+ );
+
+ {1} INTO Dest2 (
+ SELECT
+ Key AS Key,
+ CAST(RandomUuid(Key) AS String) AS Key2,
+ Value AS Value
+ From $t
+ );
+ )", op1, op2);
+
+ if (UseDataQuery) {
+ auto session = kikimr.GetTableClient().CreateSession().GetValueSync().GetSession();
+ auto result = session.ExecuteDataQuery(query, NYdb::NTable::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
+ UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
+ } else {
+ auto result = kikimr.GetQueryClient().ExecuteQuery(query, NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
+ UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
+ }
+ }
+ {
+ const TString query = R"(
+ SELECT Value FROM Dest1 ORDER BY Value;
+ SELECT Value FROM Dest2 ORDER BY Value;
+ )";
+
+ auto session = kikimr.GetTableClient().CreateSession().GetValueSync().GetSession();
+ auto result = session.ExecuteDataQuery(query, NYdb::NTable::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
+ UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
+
+ Cerr << FormatResultSetYson(result.GetResultSet(0)) << Endl;
+ Cerr << FormatResultSetYson(result.GetResultSet(1)) << Endl;
+
+ const bool onlyOneRevert = (op1 == "INSERT OR REVERT") != (op2 == "INSERT OR REVERT");
+
+ if (onlyOneRevert && UseSink) {
+ // TODO:
+ UNIT_ASSERT(FormatResultSetYson(result.GetResultSet(0)) != FormatResultSetYson(result.GetResultSet(1)));
+ } else {
+ UNIT_ASSERT_VALUES_EQUAL(FormatResultSetYson(result.GetResultSet(0)), FormatResultSetYson(result.GetResultSet(1)));
+ }
+ }
+ }
+ }
+ }
+
+ Y_UNIT_TEST_TWIN(NamedExpressionRandomSelect, UseSink) {
+ NKikimrConfig::TAppConfig appConfig;
+ appConfig.MutableTableServiceConfig()->SetEnableOltpSink(UseSink);
+ auto settings = TKikimrSettings()
+ .SetAppConfig(appConfig)
+ .SetWithSampleTables(true);
+
+ TKikimrRunner kikimr(settings);
+ {
+ const TString query = R"(
+ CREATE TABLE Source (
+ Key String,
+ Key2 String,
+ Value String,
+ PRIMARY KEY (Key)
+ );
+ )";
+
+ auto session = kikimr.GetTableClient().CreateSession().GetValueSync().GetSession();
+ auto result = session.ExecuteSchemeQuery(query).ExtractValueSync();
+ UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
+ }
+
+ {
+ const TString query = R"(
+ INSERT INTO Source (Key, Key2, Value) VALUES
+ ("1", "test", "");
+ )";
+
+ auto session = kikimr.GetTableClient().CreateSession().GetValueSync().GetSession();
+ auto result = session.ExecuteDataQuery(query, NYdb::NTable::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
+ UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
+ }
+
+ {
+ const TString query = R"(
+ $t = (
+ SELECT
+ Key AS Key,
+ Key AS Key2,
+ CAST(RandomUuid(Key) AS String) As Value
+ FROM Source
+ );
+
+ SELECT COUNT(DISTINCT Value) FROM (SELECT * FROM $t UNION ALL SELECT * FROM $t);
+ )";
+
+ auto session = kikimr.GetTableClient().CreateSession().GetValueSync().GetSession();
+ auto result = session.ExecuteDataQuery(query, NYdb::NTable::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
+ UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
+
+ Cerr << FormatResultSetYson(result.GetResultSet(0)) << Endl;
+
+ UNIT_ASSERT_VALUES_EQUAL(FormatResultSetYson(result.GetResultSet(0)), "[[2u]]");
+ }
+
+ }
}
} // namespace NKikimr::NKqp