diff options
author | Nikita Vasilev <ns-vasilev@ydb.tech> | 2024-01-31 16:33:50 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-01-31 16:33:50 +0300 |
commit | 059614c447bd1c7751fd8c0c83f7e3c98e71f5dc (patch) | |
tree | 236603d376f35d2560d6dfe6aedaef07ea98eefb | |
parent | d4c6fda27f1d4df1b042e33f1f7473ba079c01f2 (diff) | |
download | ydb-059614c447bd1c7751fd8c0c83f7e3c98e71f5dc.tar.gz |
fix sink test (#1468)
-rw-r--r-- | ydb/core/base/appdata_fwd.h | 1 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_data_executer.cpp | 2 | ||||
-rw-r--r-- | ydb/core/kqp/session_actor/kqp_tx.cpp | 5 | ||||
-rw-r--r-- | ydb/core/kqp/ut/olap/kqp_olap_ut.cpp | 146 |
4 files changed, 137 insertions, 17 deletions
diff --git a/ydb/core/base/appdata_fwd.h b/ydb/core/base/appdata_fwd.h index c565b870e1..c1f49337d0 100644 --- a/ydb/core/base/appdata_fwd.h +++ b/ydb/core/base/appdata_fwd.h @@ -236,7 +236,6 @@ struct TAppData { bool EnableMvccSnapshotWithLegacyDomainRoot = false; bool UsePartitionStatsCollectorForTests = false; bool DisableCdcAutoSwitchingToReadyStateForTests = false; - bool EnableOlapSink = false; TVector<TString> AdministrationAllowedSIDs; // users/groups which allowed to perform administrative tasks TVector<TString> DefaultUserSIDs; TString AllAuthenticatedUsers = "all-users@well-known"; diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp index 32ca975492..219396b5cd 100644 --- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp @@ -1711,7 +1711,7 @@ private: } if ((stageInfo.Meta.IsOlap() && HasDmlOperationOnOlap(tx.Body->GetType(), stage)) - || (EnableOlapSink && HasOlapSink(stage))) { + || (!EnableOlapSink && HasOlapSink(stage))) { auto error = TStringBuilder() << "Data manipulation queries do not support column shard tables."; LOG_E(error); ReplyErrorAndDie(Ydb::StatusIds::PRECONDITION_FAILED, diff --git a/ydb/core/kqp/session_actor/kqp_tx.cpp b/ydb/core/kqp/session_actor/kqp_tx.cpp index 2e9bcc6b20..c948ba6ea5 100644 --- a/ydb/core/kqp/session_actor/kqp_tx.cpp +++ b/ydb/core/kqp/session_actor/kqp_tx.cpp @@ -207,6 +207,11 @@ bool HasOlapTableWriteInTx(const NKqpProto::TKqpPhyQuery& physicalQuery) { bool HasOltpTableReadInTx(const NKqpProto::TKqpPhyQuery& physicalQuery) { for (const auto &tx : physicalQuery.GetTransactions()) { for (const auto &stage : tx.GetStages()) { + for (const auto &source : stage.GetSources()) { + if (source.GetTypeCase() == NKqpProto::TKqpSource::kReadRangesSource){ + return true; + } + } for (const auto &tableOp : stage.GetTableOps()) { switch (tableOp.GetTypeCase()) { case NKqpProto::TKqpPhyTableOperation::kReadRange: diff --git a/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp b/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp index d9f49cd4b0..6f53286948 100644 --- a/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp +++ b/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp @@ -5549,11 +5549,13 @@ Y_UNIT_TEST_SUITE(KqpOlap) { testHelper.ReadData("SELECT value FROM `/Root/ColumnTableTest` WHERE id = 1", "[[110]]"); } - Y_UNIT_TEST(OlapReplace_FromSelect) { + Y_UNIT_TEST(OlapReplace_FromSelectSimple) { + NKikimrConfig::TAppConfig appConfig; + appConfig.MutableTableServiceConfig()->SetEnableOlapSink(true); auto settings = TKikimrSettings() + .SetAppConfig(appConfig) .SetWithSampleTables(false); TKikimrRunner kikimr(settings); - kikimr.GetTestServer().GetRuntime()->GetAppData(0).EnableOlapSink = true; Tests::NCommon::TLoggerInit(kikimr).Initialize(); TTableWithNullsHelper(kikimr).CreateTableWithNulls(); @@ -5662,18 +5664,76 @@ Y_UNIT_TEST_SUITE(KqpOlap) { output, R"([[1u;["test1"];10];[2u;["test2"];11];[3u;["test3"];12];[4u;#;13]])"); } + } + + Y_UNIT_TEST(OlapReplace_BadTransactions) { + NKikimrConfig::TAppConfig appConfig; + appConfig.MutableTableServiceConfig()->SetEnableOlapSink(true); + auto settings = TKikimrSettings() + .SetAppConfig(appConfig) + .SetWithSampleTables(false); + TKikimrRunner kikimr(settings); + Tests::NCommon::TLoggerInit(kikimr).Initialize(); + TTableWithNullsHelper(kikimr).CreateTableWithNulls(); + + auto session = kikimr.GetTableClient().CreateSession().GetValueSync().GetSession(); + const TString query = R"( + CREATE TABLE `/Root/ColumnShard` ( + Col1 Uint64 NOT NULL, + Col2 String, + Col3 Int32 NOT NULL, + PRIMARY KEY (Col1) + ) + PARTITION BY HASH(Col1) + WITH (STORE = COLUMN, AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 10); + + CREATE TABLE `/Root/DataShard` ( + Col1 Uint64 NOT NULL, + Col2 String, + Col3 Int32 NOT NULL, + PRIMARY KEY (Col1) + ) + WITH (UNIFORM_PARTITIONS = 2, AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 2); + )"; + + auto result = session.ExecuteSchemeQuery(query).GetValueSync(); + UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); + + auto client = kikimr.GetQueryClient(); { auto prepareResult = client.ExecuteQuery(R"( - REPLACE INTO `/Root/DataShard1` (Col1, Col2, Col3) VALUES + REPLACE INTO `/Root/ColumnShard` (Col1, Col2, Col3) VALUES (1u, "test1", 10), (2u, "test2", 11), (3u, "test3", 12), (4u, NULL, 13); )", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync(); UNIT_ASSERT_C(prepareResult.IsSuccess(), prepareResult.GetIssues().ToString()); + } + { + auto prepareResult = client.ExecuteQuery(R"( + REPLACE INTO `/Root/DataShard` (Col1, Col2, Col3) VALUES + (10u, "test1", 10), (20u, "test2", 11), (30u, "test3", 12), (40u, NULL, 13); + )", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_C(prepareResult.IsSuccess(), prepareResult.GetIssues().ToString()); + } + + { + // column -> row + const TString sql = R"( + REPLACE INTO `/Root/DataShard` + SELECT * FROM `/Root/ColumnShard` + )"; + auto insertResult = client.ExecuteQuery(sql, NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync(); + UNIT_ASSERT(!insertResult.IsSuccess()); + UNIT_ASSERT_C( + insertResult.GetIssues().ToString().Contains("Transactions between column and row tables are disabled at current time"), + insertResult.GetIssues().ToString()); + } + { // row -> column const TString sql = R"( - REPLACE INTO `/Root/ColumnShard3` - SELECT * FROM `/Root/DataShard1` + REPLACE INTO `/Root/ColumnShard` + SELECT * FROM `/Root/DataShard` )"; auto insertResult = client.ExecuteQuery(sql, NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync(); UNIT_ASSERT(!insertResult.IsSuccess()); @@ -5683,10 +5743,53 @@ Y_UNIT_TEST_SUITE(KqpOlap) { } { - // column -> row + // column & row read const TString sql = R"( - REPLACE INTO `/Root/DataShard2` - SELECT * FROM `/Root/ColumnSource` + SELECT * FROM `/Root/DataShard`; + SELECT * FROM `/Root/ColumnShard`; + )"; + auto insertResult = client.ExecuteQuery(sql, NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync(); + UNIT_ASSERT(!insertResult.IsSuccess()); + UNIT_ASSERT_C( + insertResult.GetIssues().ToString().Contains("Transactions between column and row tables are disabled at current time"), + insertResult.GetIssues().ToString()); + } + + { + // column & row write + const TString sql = R"( + REPLACE INTO `/Root/DataShard` (Col1, Col2, Col3) VALUES + (1u, "test1", 10), (2u, "test2", 11), (3u, "test3", 12), (4u, NULL, 13); + REPLACE INTO `/Root/ColumnShard` (Col1, Col2, Col3) VALUES + (1u, "test1", 10), (2u, "test2", 11), (3u, "test3", 12), (4u, NULL, 13); + )"; + auto insertResult = client.ExecuteQuery(sql, NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync(); + UNIT_ASSERT(!insertResult.IsSuccess()); + UNIT_ASSERT_C( + insertResult.GetIssues().ToString().Contains("Transactions between column and row tables are disabled at current time"), + insertResult.GetIssues().ToString()); + } + + { + // column read & row write + const TString sql = R"( + REPLACE INTO `/Root/DataShard` (Col1, Col2, Col3) VALUES + (1u, "test1", 10), (2u, "test2", 11), (3u, "test3", 12), (4u, NULL, 13); + SELECT * FROM `/Root/ColumnShard`; + )"; + auto insertResult = client.ExecuteQuery(sql, NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync(); + UNIT_ASSERT(!insertResult.IsSuccess()); + UNIT_ASSERT_C( + insertResult.GetIssues().ToString().Contains("Transactions between column and row tables are disabled at current time"), + insertResult.GetIssues().ToString()); + } + + { + // column write & row read + const TString sql = R"( + REPLACE INTO `/Root/ColumnShard` (Col1, Col2, Col3) VALUES + (1u, "test1", 10), (2u, "test2", 11), (3u, "test3", 12), (4u, NULL, 13); + SELECT * FROM `/Root/DataShard`; )"; auto insertResult = client.ExecuteQuery(sql, NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync(); UNIT_ASSERT(!insertResult.IsSuccess()); @@ -5697,13 +5800,15 @@ Y_UNIT_TEST_SUITE(KqpOlap) { } Y_UNIT_TEST(OlapReplace_FromSelectLarge) { + NKikimrConfig::TAppConfig appConfig; + appConfig.MutableTableServiceConfig()->SetEnableOlapSink(true); auto settings = TKikimrSettings() + .SetAppConfig(appConfig) .SetWithSampleTables(false); TTestHelper testHelper(settings); TKikimrRunner& kikimr = testHelper.GetKikimr(); - testHelper.GetRuntime().GetAppData(0).EnableOlapSink = true; Tests::NCommon::TLoggerInit(kikimr).Initialize(); TVector<TTestHelper::TColumnSchema> schema = { @@ -5712,11 +5817,11 @@ Y_UNIT_TEST_SUITE(KqpOlap) { }; TTestHelper::TColumnTable testTable1; - testTable1.SetName("/Root/ColumnShard1").SetPrimaryKey({ "Col1" }).SetSharding({ "Col1" }).SetSchema(schema); + testTable1.SetName("/Root/ColumnShard1").SetPrimaryKey({ "Col1" }).SetSharding({ "Col1" }).SetSchema(schema).SetMinPartitionsCount(1000); testHelper.CreateTable(testTable1); TTestHelper::TColumnTable testTable2; - testTable2.SetName("/Root/ColumnShard2").SetPrimaryKey({ "Col1" }).SetSharding({ "Col1" }).SetSchema(schema); + testTable2.SetName("/Root/ColumnShard2").SetPrimaryKey({ "Col1" }).SetSharding({ "Col1" }).SetSchema(schema).SetMinPartitionsCount(1000); testHelper.CreateTable(testTable2); { @@ -5750,10 +5855,12 @@ Y_UNIT_TEST_SUITE(KqpOlap) { } Y_UNIT_TEST(OlapReplace_Simple) { + NKikimrConfig::TAppConfig appConfig; + appConfig.MutableTableServiceConfig()->SetEnableOlapSink(true); auto settings = TKikimrSettings() + .SetAppConfig(appConfig) .SetWithSampleTables(false); TKikimrRunner kikimr(settings); - kikimr.GetTestServer().GetRuntime()->GetAppData(0).EnableOlapSink = true; Tests::NCommon::TLoggerInit(kikimr).Initialize(); TTableWithNullsHelper(kikimr).CreateTableWithNulls(); @@ -5801,10 +5908,13 @@ Y_UNIT_TEST_SUITE(KqpOlap) { } Y_UNIT_TEST(OlapReplace_InsertUpsertError) { + NKikimrConfig::TAppConfig appConfig; + appConfig.MutableTableServiceConfig()->SetEnableOlapSink(true); auto settings = TKikimrSettings() + .SetAppConfig(appConfig) .SetWithSampleTables(false); + TKikimrRunner kikimr(settings); - kikimr.GetTestServer().GetRuntime()->GetAppData(0).EnableOlapSink = true; Tests::NCommon::TLoggerInit(kikimr).Initialize(); TTableWithNullsHelper(kikimr).CreateTableWithNulls(); @@ -5855,10 +5965,13 @@ Y_UNIT_TEST_SUITE(KqpOlap) { } Y_UNIT_TEST(OlapReplace_Duplicates) { + NKikimrConfig::TAppConfig appConfig; + appConfig.MutableTableServiceConfig()->SetEnableOlapSink(true); auto settings = TKikimrSettings() + .SetAppConfig(appConfig) .SetWithSampleTables(false); + TKikimrRunner kikimr(settings); - kikimr.GetTestServer().GetRuntime()->GetAppData(0).EnableOlapSink = true; Tests::NCommon::TLoggerInit(kikimr).Initialize(); TTableWithNullsHelper(kikimr).CreateTableWithNulls(); @@ -5903,10 +6016,13 @@ Y_UNIT_TEST_SUITE(KqpOlap) { } Y_UNIT_TEST(OlapReplace_DisableOlapSink) { + NKikimrConfig::TAppConfig appConfig; + appConfig.MutableTableServiceConfig()->SetEnableOlapSink(false); auto settings = TKikimrSettings() + .SetAppConfig(appConfig) .SetWithSampleTables(false); + TKikimrRunner kikimr(settings); - kikimr.GetTestServer().GetRuntime()->GetAppData(0).EnableOlapSink = false; Tests::NCommon::TLoggerInit(kikimr).Initialize(); TTableWithNullsHelper(kikimr).CreateTableWithNulls(); |