aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorОлег <150132506+iddqdex@users.noreply.github.com>2024-01-19 00:20:37 +0300
committerGitHub <noreply@github.com>2024-01-19 00:20:37 +0300
commit9e04a3a768f5f8c84c2da9fb492af54c87e53d56 (patch)
tree7e87449b99dcdb3532d0e3362a94250e33876483
parent63fd59847024a876a7e8892b0e508edf709a1b2a (diff)
downloadydb-9e04a3a768f5f8c84c2da9fb492af54c87e53d56.tar.gz
KIKIMR-20778: Тест на лаг обновления данных через bulk_upsert (#1143)
-rw-r--r--ydb/core/kqp/ut/olap/kqp_olap_ut.cpp40
1 files changed, 39 insertions, 1 deletions
diff --git a/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp b/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp
index 12408595b2..9bbfdecf7f 100644
--- a/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp
+++ b/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp
@@ -534,8 +534,9 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
void WriteTestData(TKikimrRunner& kikimr, TString testTable, ui64 pathIdBegin, ui64 tsBegin, size_t rowCount, bool withSomeNulls = false) {
UNIT_ASSERT(testTable != "/Root/benchTable"); // TODO: check schema instead
TLocalHelper lHelper(kikimr);
- if (withSomeNulls)
+ if (withSomeNulls) {
lHelper.WithSomeNulls();
+ }
auto batch = lHelper.TestArrowBatch(pathIdBegin, tsBegin, rowCount);
lHelper.SendDataViaActorSystem(testTable, batch);
}
@@ -5292,6 +5293,43 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
testHelper.ReadData("SELECT * FROM `/Root/ColumnTableTest` WHERE id=2", "[[2;\"test_res_2\";#;[\"val1\"]]]");
}
+ Y_UNIT_TEST(BulkUpsertUpdate) {
+ TKikimrSettings runnerSettings;
+ runnerSettings.WithSampleTables = false;
+ auto csController = NYDBTest::TControllers::RegisterCSControllerGuard<NYDBTest::NColumnShard::TController>();
+ TTestHelper testHelper(runnerSettings);
+
+ TVector<TTestHelper::TColumnSchema> schema = {
+ TTestHelper::TColumnSchema().SetName("id").SetType(NScheme::NTypeIds::Int64).SetNullable(false),
+ TTestHelper::TColumnSchema().SetName("value").SetType(NScheme::NTypeIds::Int32).SetNullable(false),
+ };
+
+ TTestHelper::TColumnTable testTable;
+ testTable.SetName("/Root/ColumnTableTest").SetPrimaryKey({ "id" }).SetSharding({ "id" }).SetSchema(schema);
+ testHelper.CreateTable(testTable);
+ {
+ TTestHelper::TUpdatesBuilder tableInserter(testTable.GetArrowSchema(schema));
+ tableInserter.AddRow().Add(1).Add(10);
+ testHelper.BulkUpsert(testTable, tableInserter);
+ }
+ while (csController->GetIndexations().Val() < 1) {
+ Cout << "Wait indexation..." << Endl;
+ Sleep(TDuration::Seconds(2));
+ }
+ testHelper.ReadData("SELECT value FROM `/Root/ColumnTableTest` WHERE id = 1", "[[10]]");
+ {
+ TTestHelper::TUpdatesBuilder tableInserter(testTable.GetArrowSchema(schema));
+ tableInserter.AddRow().Add(1).Add(110);
+ testHelper.BulkUpsert(testTable, tableInserter);
+ }
+ testHelper.ReadData("SELECT value FROM `/Root/ColumnTableTest` WHERE id = 1", "[[110]]");
+ while (csController->GetIndexations().Val() < 2) {
+ Cout << "Wait indexation..." << Endl;
+ Sleep(TDuration::Seconds(2));
+ }
+ testHelper.ReadData("SELECT value FROM `/Root/ColumnTableTest` WHERE id = 1", "[[110]]");
+ }
+
}
} // namespace NKqp