aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorNikita Vasilev <ns-vasilev@ydb.tech>2024-01-31 16:33:50 +0300
committerGitHub <noreply@github.com>2024-01-31 16:33:50 +0300
commit059614c447bd1c7751fd8c0c83f7e3c98e71f5dc (patch)
tree236603d376f35d2560d6dfe6aedaef07ea98eefb
parentd4c6fda27f1d4df1b042e33f1f7473ba079c01f2 (diff)
downloadydb-059614c447bd1c7751fd8c0c83f7e3c98e71f5dc.tar.gz
fix sink test (#1468)
-rw-r--r--ydb/core/base/appdata_fwd.h1
-rw-r--r--ydb/core/kqp/executer_actor/kqp_data_executer.cpp2
-rw-r--r--ydb/core/kqp/session_actor/kqp_tx.cpp5
-rw-r--r--ydb/core/kqp/ut/olap/kqp_olap_ut.cpp146
4 files changed, 137 insertions, 17 deletions
diff --git a/ydb/core/base/appdata_fwd.h b/ydb/core/base/appdata_fwd.h
index c565b870e1..c1f49337d0 100644
--- a/ydb/core/base/appdata_fwd.h
+++ b/ydb/core/base/appdata_fwd.h
@@ -236,7 +236,6 @@ struct TAppData {
bool EnableMvccSnapshotWithLegacyDomainRoot = false;
bool UsePartitionStatsCollectorForTests = false;
bool DisableCdcAutoSwitchingToReadyStateForTests = false;
- bool EnableOlapSink = false;
TVector<TString> AdministrationAllowedSIDs; // users/groups which allowed to perform administrative tasks
TVector<TString> DefaultUserSIDs;
TString AllAuthenticatedUsers = "all-users@well-known";
diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
index 32ca975492..219396b5cd 100644
--- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
@@ -1711,7 +1711,7 @@ private:
}
if ((stageInfo.Meta.IsOlap() && HasDmlOperationOnOlap(tx.Body->GetType(), stage))
- || (EnableOlapSink && HasOlapSink(stage))) {
+ || (!EnableOlapSink && HasOlapSink(stage))) {
auto error = TStringBuilder() << "Data manipulation queries do not support column shard tables.";
LOG_E(error);
ReplyErrorAndDie(Ydb::StatusIds::PRECONDITION_FAILED,
diff --git a/ydb/core/kqp/session_actor/kqp_tx.cpp b/ydb/core/kqp/session_actor/kqp_tx.cpp
index 2e9bcc6b20..c948ba6ea5 100644
--- a/ydb/core/kqp/session_actor/kqp_tx.cpp
+++ b/ydb/core/kqp/session_actor/kqp_tx.cpp
@@ -207,6 +207,11 @@ bool HasOlapTableWriteInTx(const NKqpProto::TKqpPhyQuery& physicalQuery) {
bool HasOltpTableReadInTx(const NKqpProto::TKqpPhyQuery& physicalQuery) {
for (const auto &tx : physicalQuery.GetTransactions()) {
for (const auto &stage : tx.GetStages()) {
+ for (const auto &source : stage.GetSources()) {
+ if (source.GetTypeCase() == NKqpProto::TKqpSource::kReadRangesSource){
+ return true;
+ }
+ }
for (const auto &tableOp : stage.GetTableOps()) {
switch (tableOp.GetTypeCase()) {
case NKqpProto::TKqpPhyTableOperation::kReadRange:
diff --git a/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp b/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp
index d9f49cd4b0..6f53286948 100644
--- a/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp
+++ b/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp
@@ -5549,11 +5549,13 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
testHelper.ReadData("SELECT value FROM `/Root/ColumnTableTest` WHERE id = 1", "[[110]]");
}
- Y_UNIT_TEST(OlapReplace_FromSelect) {
+ Y_UNIT_TEST(OlapReplace_FromSelectSimple) {
+ NKikimrConfig::TAppConfig appConfig;
+ appConfig.MutableTableServiceConfig()->SetEnableOlapSink(true);
auto settings = TKikimrSettings()
+ .SetAppConfig(appConfig)
.SetWithSampleTables(false);
TKikimrRunner kikimr(settings);
- kikimr.GetTestServer().GetRuntime()->GetAppData(0).EnableOlapSink = true;
Tests::NCommon::TLoggerInit(kikimr).Initialize();
TTableWithNullsHelper(kikimr).CreateTableWithNulls();
@@ -5662,18 +5664,76 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
output,
R"([[1u;["test1"];10];[2u;["test2"];11];[3u;["test3"];12];[4u;#;13]])");
}
+ }
+
+ Y_UNIT_TEST(OlapReplace_BadTransactions) {
+ NKikimrConfig::TAppConfig appConfig;
+ appConfig.MutableTableServiceConfig()->SetEnableOlapSink(true);
+ auto settings = TKikimrSettings()
+ .SetAppConfig(appConfig)
+ .SetWithSampleTables(false);
+ TKikimrRunner kikimr(settings);
+ Tests::NCommon::TLoggerInit(kikimr).Initialize();
+ TTableWithNullsHelper(kikimr).CreateTableWithNulls();
+
+ auto session = kikimr.GetTableClient().CreateSession().GetValueSync().GetSession();
+ const TString query = R"(
+ CREATE TABLE `/Root/ColumnShard` (
+ Col1 Uint64 NOT NULL,
+ Col2 String,
+ Col3 Int32 NOT NULL,
+ PRIMARY KEY (Col1)
+ )
+ PARTITION BY HASH(Col1)
+ WITH (STORE = COLUMN, AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 10);
+
+ CREATE TABLE `/Root/DataShard` (
+ Col1 Uint64 NOT NULL,
+ Col2 String,
+ Col3 Int32 NOT NULL,
+ PRIMARY KEY (Col1)
+ )
+ WITH (UNIFORM_PARTITIONS = 2, AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 2);
+ )";
+
+ auto result = session.ExecuteSchemeQuery(query).GetValueSync();
+ UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
+
+ auto client = kikimr.GetQueryClient();
{
auto prepareResult = client.ExecuteQuery(R"(
- REPLACE INTO `/Root/DataShard1` (Col1, Col2, Col3) VALUES
+ REPLACE INTO `/Root/ColumnShard` (Col1, Col2, Col3) VALUES
(1u, "test1", 10), (2u, "test2", 11), (3u, "test3", 12), (4u, NULL, 13);
)", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT_C(prepareResult.IsSuccess(), prepareResult.GetIssues().ToString());
+ }
+ {
+ auto prepareResult = client.ExecuteQuery(R"(
+ REPLACE INTO `/Root/DataShard` (Col1, Col2, Col3) VALUES
+ (10u, "test1", 10), (20u, "test2", 11), (30u, "test3", 12), (40u, NULL, 13);
+ )", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
+ UNIT_ASSERT_C(prepareResult.IsSuccess(), prepareResult.GetIssues().ToString());
+ }
+
+ {
+ // column -> row
+ const TString sql = R"(
+ REPLACE INTO `/Root/DataShard`
+ SELECT * FROM `/Root/ColumnShard`
+ )";
+ auto insertResult = client.ExecuteQuery(sql, NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync();
+ UNIT_ASSERT(!insertResult.IsSuccess());
+ UNIT_ASSERT_C(
+ insertResult.GetIssues().ToString().Contains("Transactions between column and row tables are disabled at current time"),
+ insertResult.GetIssues().ToString());
+ }
+ {
// row -> column
const TString sql = R"(
- REPLACE INTO `/Root/ColumnShard3`
- SELECT * FROM `/Root/DataShard1`
+ REPLACE INTO `/Root/ColumnShard`
+ SELECT * FROM `/Root/DataShard`
)";
auto insertResult = client.ExecuteQuery(sql, NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync();
UNIT_ASSERT(!insertResult.IsSuccess());
@@ -5683,10 +5743,53 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
}
{
- // column -> row
+ // column & row read
const TString sql = R"(
- REPLACE INTO `/Root/DataShard2`
- SELECT * FROM `/Root/ColumnSource`
+ SELECT * FROM `/Root/DataShard`;
+ SELECT * FROM `/Root/ColumnShard`;
+ )";
+ auto insertResult = client.ExecuteQuery(sql, NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync();
+ UNIT_ASSERT(!insertResult.IsSuccess());
+ UNIT_ASSERT_C(
+ insertResult.GetIssues().ToString().Contains("Transactions between column and row tables are disabled at current time"),
+ insertResult.GetIssues().ToString());
+ }
+
+ {
+ // column & row write
+ const TString sql = R"(
+ REPLACE INTO `/Root/DataShard` (Col1, Col2, Col3) VALUES
+ (1u, "test1", 10), (2u, "test2", 11), (3u, "test3", 12), (4u, NULL, 13);
+ REPLACE INTO `/Root/ColumnShard` (Col1, Col2, Col3) VALUES
+ (1u, "test1", 10), (2u, "test2", 11), (3u, "test3", 12), (4u, NULL, 13);
+ )";
+ auto insertResult = client.ExecuteQuery(sql, NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync();
+ UNIT_ASSERT(!insertResult.IsSuccess());
+ UNIT_ASSERT_C(
+ insertResult.GetIssues().ToString().Contains("Transactions between column and row tables are disabled at current time"),
+ insertResult.GetIssues().ToString());
+ }
+
+ {
+ // column read & row write
+ const TString sql = R"(
+ REPLACE INTO `/Root/DataShard` (Col1, Col2, Col3) VALUES
+ (1u, "test1", 10), (2u, "test2", 11), (3u, "test3", 12), (4u, NULL, 13);
+ SELECT * FROM `/Root/ColumnShard`;
+ )";
+ auto insertResult = client.ExecuteQuery(sql, NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync();
+ UNIT_ASSERT(!insertResult.IsSuccess());
+ UNIT_ASSERT_C(
+ insertResult.GetIssues().ToString().Contains("Transactions between column and row tables are disabled at current time"),
+ insertResult.GetIssues().ToString());
+ }
+
+ {
+ // column write & row read
+ const TString sql = R"(
+ REPLACE INTO `/Root/ColumnShard` (Col1, Col2, Col3) VALUES
+ (1u, "test1", 10), (2u, "test2", 11), (3u, "test3", 12), (4u, NULL, 13);
+ SELECT * FROM `/Root/DataShard`;
)";
auto insertResult = client.ExecuteQuery(sql, NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync();
UNIT_ASSERT(!insertResult.IsSuccess());
@@ -5697,13 +5800,15 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
}
Y_UNIT_TEST(OlapReplace_FromSelectLarge) {
+ NKikimrConfig::TAppConfig appConfig;
+ appConfig.MutableTableServiceConfig()->SetEnableOlapSink(true);
auto settings = TKikimrSettings()
+ .SetAppConfig(appConfig)
.SetWithSampleTables(false);
TTestHelper testHelper(settings);
TKikimrRunner& kikimr = testHelper.GetKikimr();
- testHelper.GetRuntime().GetAppData(0).EnableOlapSink = true;
Tests::NCommon::TLoggerInit(kikimr).Initialize();
TVector<TTestHelper::TColumnSchema> schema = {
@@ -5712,11 +5817,11 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
};
TTestHelper::TColumnTable testTable1;
- testTable1.SetName("/Root/ColumnShard1").SetPrimaryKey({ "Col1" }).SetSharding({ "Col1" }).SetSchema(schema);
+ testTable1.SetName("/Root/ColumnShard1").SetPrimaryKey({ "Col1" }).SetSharding({ "Col1" }).SetSchema(schema).SetMinPartitionsCount(1000);
testHelper.CreateTable(testTable1);
TTestHelper::TColumnTable testTable2;
- testTable2.SetName("/Root/ColumnShard2").SetPrimaryKey({ "Col1" }).SetSharding({ "Col1" }).SetSchema(schema);
+ testTable2.SetName("/Root/ColumnShard2").SetPrimaryKey({ "Col1" }).SetSharding({ "Col1" }).SetSchema(schema).SetMinPartitionsCount(1000);
testHelper.CreateTable(testTable2);
{
@@ -5750,10 +5855,12 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
}
Y_UNIT_TEST(OlapReplace_Simple) {
+ NKikimrConfig::TAppConfig appConfig;
+ appConfig.MutableTableServiceConfig()->SetEnableOlapSink(true);
auto settings = TKikimrSettings()
+ .SetAppConfig(appConfig)
.SetWithSampleTables(false);
TKikimrRunner kikimr(settings);
- kikimr.GetTestServer().GetRuntime()->GetAppData(0).EnableOlapSink = true;
Tests::NCommon::TLoggerInit(kikimr).Initialize();
TTableWithNullsHelper(kikimr).CreateTableWithNulls();
@@ -5801,10 +5908,13 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
}
Y_UNIT_TEST(OlapReplace_InsertUpsertError) {
+ NKikimrConfig::TAppConfig appConfig;
+ appConfig.MutableTableServiceConfig()->SetEnableOlapSink(true);
auto settings = TKikimrSettings()
+ .SetAppConfig(appConfig)
.SetWithSampleTables(false);
+
TKikimrRunner kikimr(settings);
- kikimr.GetTestServer().GetRuntime()->GetAppData(0).EnableOlapSink = true;
Tests::NCommon::TLoggerInit(kikimr).Initialize();
TTableWithNullsHelper(kikimr).CreateTableWithNulls();
@@ -5855,10 +5965,13 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
}
Y_UNIT_TEST(OlapReplace_Duplicates) {
+ NKikimrConfig::TAppConfig appConfig;
+ appConfig.MutableTableServiceConfig()->SetEnableOlapSink(true);
auto settings = TKikimrSettings()
+ .SetAppConfig(appConfig)
.SetWithSampleTables(false);
+
TKikimrRunner kikimr(settings);
- kikimr.GetTestServer().GetRuntime()->GetAppData(0).EnableOlapSink = true;
Tests::NCommon::TLoggerInit(kikimr).Initialize();
TTableWithNullsHelper(kikimr).CreateTableWithNulls();
@@ -5903,10 +6016,13 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
}
Y_UNIT_TEST(OlapReplace_DisableOlapSink) {
+ NKikimrConfig::TAppConfig appConfig;
+ appConfig.MutableTableServiceConfig()->SetEnableOlapSink(false);
auto settings = TKikimrSettings()
+ .SetAppConfig(appConfig)
.SetWithSampleTables(false);
+
TKikimrRunner kikimr(settings);
- kikimr.GetTestServer().GetRuntime()->GetAppData(0).EnableOlapSink = false;
Tests::NCommon::TLoggerInit(kikimr).Initialize();
TTableWithNullsHelper(kikimr).CreateTableWithNulls();