diff options
author | Nikita Vasilev <ns-vasilev@ydb.tech> | 2025-04-22 20:00:40 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2025-04-22 20:00:40 +0300 |
commit | c9d786654e44f473624bd81ee017dea6559e80b1 (patch) | |
tree | e9602024387c30c6876dab78b96ccc6cc8be9269 | |
parent | 7a3575ff900bc1ec9529d4b8deea55ffab1335a5 (diff) | |
download | ydb-c9d786654e44f473624bd81ee017dea6559e80b1.tar.gz |
Tests for named expressions with sinks (#17540)
-rw-r--r-- | ydb/core/kqp/ut/opt/kqp_named_expressions_ut.cpp | 404 |
1 files changed, 404 insertions, 0 deletions
diff --git a/ydb/core/kqp/ut/opt/kqp_named_expressions_ut.cpp b/ydb/core/kqp/ut/opt/kqp_named_expressions_ut.cpp index 3eac0573cfc..63f86347b30 100644 --- a/ydb/core/kqp/ut/opt/kqp_named_expressions_ut.cpp +++ b/ydb/core/kqp/ut/opt/kqp_named_expressions_ut.cpp @@ -461,6 +461,410 @@ Y_UNIT_TEST_SUITE(KqpNamedExpressions) { } } } + + Y_UNIT_TEST_QUAD(NamedExpressionRandomUpsertIndex, UseSink, UseDataQuery) { + NKikimrConfig::TAppConfig appConfig; + appConfig.MutableTableServiceConfig()->SetEnableOltpSink(UseSink); + auto settings = TKikimrSettings() + .SetAppConfig(appConfig) + .SetWithSampleTables(true); + + const std::vector<std::pair<std::string, std::string>> tests = { + {"", ""}, + {"INDEX i GLOBAL ON (Key2),", ""}, + {"", "INDEX i GLOBAL ON (Key2),"}, + {"INDEX i GLOBAL ON (Key2),", "INDEX i GLOBAL ON (Key2),"}, + }; + + for (const auto& [index1, index2] : tests) { + TKikimrRunner kikimr(settings); + { + const TString query = std::format(R"( + CREATE TABLE Source ( + Key String, + Key2 String, + Value String, + PRIMARY KEY (Key) + ); + + CREATE TABLE Dest1 ( + Key String, + Key2 String, + Value String, + {0} + PRIMARY KEY (Key) + ); + + CREATE TABLE Dest2 ( + Key String, + Key2 String, + Value String, + {1} + PRIMARY KEY (Key) + ); + )", index1, index2); + + auto session = kikimr.GetTableClient().CreateSession().GetValueSync().GetSession(); + auto result = session.ExecuteSchemeQuery(query).ExtractValueSync(); + UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); + } + + { + const TString query = R"( + INSERT INTO Source (Key, Key2, Value) VALUES + ("1", "test", ""); + )"; + + auto session = kikimr.GetTableClient().CreateSession().GetValueSync().GetSession(); + auto result = session.ExecuteDataQuery(query, NYdb::NTable::TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); + } + + { + const TString query = R"( + $t = ( + SELECT + Key AS Key, + Key AS Key2, + CAST(RandomUuid(Key) AS String) As Value + FROM Source + ); + + UPSERT INTO Dest1 ( + SELECT + Key AS Key, + CAST(RandomUuid(Key) AS String) AS Key2, + Value AS Value + From $t + ); + + UPSERT INTO Dest2 ( + SELECT + Key AS Key, + CAST(RandomUuid(Key) AS String) AS Key2, + Value AS Value + From $t + ); + )"; + + if (UseDataQuery) { + auto session = kikimr.GetTableClient().CreateSession().GetValueSync().GetSession(); + auto result = session.ExecuteDataQuery(query, NYdb::NTable::TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); + } else { + auto result = kikimr.GetQueryClient().ExecuteQuery(query, NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); + } + } + { + const TString query = R"( + SELECT Value FROM Dest1 ORDER BY Value; + SELECT Value FROM Dest2 ORDER BY Value; + )"; + + auto session = kikimr.GetTableClient().CreateSession().GetValueSync().GetSession(); + auto result = session.ExecuteDataQuery(query, NYdb::NTable::TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); + + Cerr << FormatResultSetYson(result.GetResultSet(0)) << Endl; + Cerr << FormatResultSetYson(result.GetResultSet(1)) << Endl; + + const bool onlyOneIndex = (index1.empty() != index2.empty()); + + if (onlyOneIndex && UseSink) { + // "with index" uses literal executer (precompute), while "without index" uses compute actor. + // TODO: + UNIT_ASSERT(FormatResultSetYson(result.GetResultSet(0)) != FormatResultSetYson(result.GetResultSet(1))); + } else { + UNIT_ASSERT_VALUES_EQUAL(FormatResultSetYson(result.GetResultSet(0)), FormatResultSetYson(result.GetResultSet(1))); + } + } + } + } + + Y_UNIT_TEST_QUAD(NamedExpressionRandomUpsertReturning, UseSink, UseDataQuery) { + NKikimrConfig::TAppConfig appConfig; + appConfig.MutableTableServiceConfig()->SetEnableOltpSink(UseSink); + auto settings = TKikimrSettings() + .SetAppConfig(appConfig) + .SetWithSampleTables(true); + + const std::vector<std::pair<std::string, std::string>> tests = { + {"", ""}, + {"RETURNING Value", ""}, + {"", "RETURNING Value"}, + {"RETURNING Value", "RETURNING Value"}, + }; + + for (const auto& [ret1, ret2] : tests) { + TKikimrRunner kikimr(settings); + { + const TString query = R"( + CREATE TABLE Source ( + Key String, + Key2 String, + Value String, + PRIMARY KEY (Key) + ); + + CREATE TABLE Dest1 ( + Key String, + Key2 String, + Value String, + PRIMARY KEY (Key) + ); + + CREATE TABLE Dest2 ( + Key String, + Key2 String, + Value String, + PRIMARY KEY (Key) + ); + )"; + + auto session = kikimr.GetTableClient().CreateSession().GetValueSync().GetSession(); + auto result = session.ExecuteSchemeQuery(query).ExtractValueSync(); + UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); + } + + { + const TString query = R"( + INSERT INTO Source (Key, Key2, Value) VALUES + ("1", "test", ""); + )"; + + auto session = kikimr.GetTableClient().CreateSession().GetValueSync().GetSession(); + auto result = session.ExecuteDataQuery(query, NYdb::NTable::TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); + } + + { + const TString query = std::format(R"( + $t = ( + SELECT + Key AS Key, + Key AS Key2, + CAST(RandomUuid(Key) AS String) As Value + FROM Source + ); + + UPSERT INTO Dest1 ( + SELECT + Key AS Key, + CAST(RandomUuid(Key) AS String) AS Key2, + Value AS Value + From $t + ) + {0}; + + UPSERT INTO Dest2 ( + SELECT + Key AS Key, + CAST(RandomUuid(Key) AS String) AS Key2, + Value AS Value + From $t + ) + {1}; + )", ret1, ret2); + + if (UseDataQuery) { + auto session = kikimr.GetTableClient().CreateSession().GetValueSync().GetSession(); + auto result = session.ExecuteDataQuery(query, NYdb::NTable::TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); + } else { + auto result = kikimr.GetQueryClient().ExecuteQuery(query, NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); + } + } + { + const TString query = R"( + SELECT Value FROM Dest1 ORDER BY Value; + SELECT Value FROM Dest2 ORDER BY Value; + )"; + + auto session = kikimr.GetTableClient().CreateSession().GetValueSync().GetSession(); + auto result = session.ExecuteDataQuery(query, NYdb::NTable::TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); + + Cerr << FormatResultSetYson(result.GetResultSet(0)) << Endl; + Cerr << FormatResultSetYson(result.GetResultSet(1)) << Endl; + + UNIT_ASSERT_VALUES_EQUAL(FormatResultSetYson(result.GetResultSet(0)), FormatResultSetYson(result.GetResultSet(1))); + } + } + } + + Y_UNIT_TEST_QUAD(NamedExpressionRandomUpsertRevert, UseSink, UseDataQuery) { + NKikimrConfig::TAppConfig appConfig; + appConfig.MutableTableServiceConfig()->SetEnableOltpSink(UseSink); + auto settings = TKikimrSettings() + .SetAppConfig(appConfig) + .SetWithSampleTables(true); + + const std::vector<std::string> ops = {"UPSERT", "INSERT", "INSERT OR REVERT"}; + + for (const auto& op1 : ops) { + for (const auto& op2 : ops) { + TKikimrRunner kikimr(settings); + { + const TString query = R"( + CREATE TABLE Source ( + Key String, + Key2 String, + Value String, + PRIMARY KEY (Key) + ); + + CREATE TABLE Dest1 ( + Key String, + Key2 String, + Value String, + PRIMARY KEY (Key) + ); + + CREATE TABLE Dest2 ( + Key String, + Key2 String, + Value String, + PRIMARY KEY (Key) + ); + )"; + + auto session = kikimr.GetTableClient().CreateSession().GetValueSync().GetSession(); + auto result = session.ExecuteSchemeQuery(query).ExtractValueSync(); + UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); + } + + { + const TString query = R"( + INSERT INTO Source (Key, Key2, Value) VALUES + ("1", "test", ""); + )"; + + auto session = kikimr.GetTableClient().CreateSession().GetValueSync().GetSession(); + auto result = session.ExecuteDataQuery(query, NYdb::NTable::TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); + } + + { + const TString query = std::format(R"( + $t = ( + SELECT + Key AS Key, + Key AS Key2, + CAST(RandomUuid(Key) AS String) As Value + FROM Source + ); + + {0} INTO Dest1 ( + SELECT + Key AS Key, + CAST(RandomUuid(Key) AS String) AS Key2, + Value AS Value + From $t + ); + + {1} INTO Dest2 ( + SELECT + Key AS Key, + CAST(RandomUuid(Key) AS String) AS Key2, + Value AS Value + From $t + ); + )", op1, op2); + + if (UseDataQuery) { + auto session = kikimr.GetTableClient().CreateSession().GetValueSync().GetSession(); + auto result = session.ExecuteDataQuery(query, NYdb::NTable::TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); + } else { + auto result = kikimr.GetQueryClient().ExecuteQuery(query, NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); + } + } + { + const TString query = R"( + SELECT Value FROM Dest1 ORDER BY Value; + SELECT Value FROM Dest2 ORDER BY Value; + )"; + + auto session = kikimr.GetTableClient().CreateSession().GetValueSync().GetSession(); + auto result = session.ExecuteDataQuery(query, NYdb::NTable::TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); + + Cerr << FormatResultSetYson(result.GetResultSet(0)) << Endl; + Cerr << FormatResultSetYson(result.GetResultSet(1)) << Endl; + + const bool onlyOneRevert = (op1 == "INSERT OR REVERT") != (op2 == "INSERT OR REVERT"); + + if (onlyOneRevert && UseSink) { + // TODO: + UNIT_ASSERT(FormatResultSetYson(result.GetResultSet(0)) != FormatResultSetYson(result.GetResultSet(1))); + } else { + UNIT_ASSERT_VALUES_EQUAL(FormatResultSetYson(result.GetResultSet(0)), FormatResultSetYson(result.GetResultSet(1))); + } + } + } + } + } + + Y_UNIT_TEST_TWIN(NamedExpressionRandomSelect, UseSink) { + NKikimrConfig::TAppConfig appConfig; + appConfig.MutableTableServiceConfig()->SetEnableOltpSink(UseSink); + auto settings = TKikimrSettings() + .SetAppConfig(appConfig) + .SetWithSampleTables(true); + + TKikimrRunner kikimr(settings); + { + const TString query = R"( + CREATE TABLE Source ( + Key String, + Key2 String, + Value String, + PRIMARY KEY (Key) + ); + )"; + + auto session = kikimr.GetTableClient().CreateSession().GetValueSync().GetSession(); + auto result = session.ExecuteSchemeQuery(query).ExtractValueSync(); + UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); + } + + { + const TString query = R"( + INSERT INTO Source (Key, Key2, Value) VALUES + ("1", "test", ""); + )"; + + auto session = kikimr.GetTableClient().CreateSession().GetValueSync().GetSession(); + auto result = session.ExecuteDataQuery(query, NYdb::NTable::TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); + } + + { + const TString query = R"( + $t = ( + SELECT + Key AS Key, + Key AS Key2, + CAST(RandomUuid(Key) AS String) As Value + FROM Source + ); + + SELECT COUNT(DISTINCT Value) FROM (SELECT * FROM $t UNION ALL SELECT * FROM $t); + )"; + + auto session = kikimr.GetTableClient().CreateSession().GetValueSync().GetSession(); + auto result = session.ExecuteDataQuery(query, NYdb::NTable::TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); + + Cerr << FormatResultSetYson(result.GetResultSet(0)) << Endl; + + UNIT_ASSERT_VALUES_EQUAL(FormatResultSetYson(result.GetResultSet(0)), "[[2u]]"); + } + + } } } // namespace NKikimr::NKqp |