diff options
author | ivanmorozov333 <ivanmorozov@ydb.tech> | 2024-04-24 15:06:08 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-04-24 15:06:08 +0300 |
commit | 16a42e93c539c43df1c9d8ae3b64b416b890d1c9 (patch) | |
tree | 3bf09948a64c570101c052dba9b70483d8ff3810 | |
parent | 087c843e70c2dfebdf7660706c7e5eed2b946e97 (diff) | |
download | ydb-16a42e93c539c43df1c9d8ae3b64b416b890d1c9.tar.gz |
rename and additional methods for tx operations (#4003)
42 files changed, 440 insertions, 247 deletions
diff --git a/ydb/core/kqp/ut/common/columnshard.cpp b/ydb/core/kqp/ut/common/columnshard.cpp index bef7d6cb91a..441be48503c 100644 --- a/ydb/core/kqp/ut/common/columnshard.cpp +++ b/ydb/core/kqp/ut/common/columnshard.cpp @@ -1,5 +1,6 @@ #include "columnshard.h" #include <ydb/core/testlib/cs_helper.h> +#include <ydb/core/base/tablet_pipecache.h> extern "C" { #include <ydb/library/yql/parser/pg_wrapper/postgresql/src/include/catalog/pg_type_d.h> diff --git a/ydb/core/kqp/ut/common/columnshard.h b/ydb/core/kqp/ut/common/columnshard.h index 34046f1b110..a938e91b4d4 100644 --- a/ydb/core/kqp/ut/common/columnshard.h +++ b/ydb/core/kqp/ut/common/columnshard.h @@ -5,7 +5,7 @@ #include <ydb/public/lib/scheme_types/scheme_type_id.h> #include <ydb/public/sdk/cpp/client/ydb_table/table.h> #include <ydb/public/sdk/cpp/client/ydb_types/status_codes.h> -#include <ydb/core/tx/columnshard/columnshard_ut_common.h> +#include <ydb/core/tx/columnshard/test_helper/columnshard_ut_common.h> #include <ydb/core/formats/arrow/simple_builder/filler.h> #include <ydb/core/formats/arrow/simple_builder/array.h> diff --git a/ydb/core/kqp/ut/common/ya.make b/ydb/core/kqp/ut/common/ya.make index 97281a7aad6..db2b3dbfc3a 100644 --- a/ydb/core/kqp/ut/common/ya.make +++ b/ydb/core/kqp/ut/common/ya.make @@ -18,6 +18,7 @@ PEERDIR( ydb/library/yql/udfs/common/string ydb/library/yql/utils/backtrace ydb/public/lib/yson_value + ydb/core/tx/columnshard/test_helper ydb/public/sdk/cpp/client/draft ydb/public/sdk/cpp/client/ydb_query ydb/public/sdk/cpp/client/ydb_proto diff --git a/ydb/core/kqp/ut/olap/sys_view_ut.cpp b/ydb/core/kqp/ut/olap/sys_view_ut.cpp index 866563f088f..4242f767629 100644 --- a/ydb/core/kqp/ut/olap/sys_view_ut.cpp +++ b/ydb/core/kqp/ut/olap/sys_view_ut.cpp @@ -6,6 +6,7 @@ #include <library/cpp/testing/unittest/registar.h> #include <ydb/core/tx/columnshard/hooks/testing/controller.h> +#include <ydb/core/tx/columnshard/test_helper/controllers.h> namespace NKikimr::NKqp { @@ -14,57 +15,52 @@ Y_UNIT_TEST_SUITE(KqpOlapSysView) { auto settings = TKikimrSettings() .SetWithSampleTables(false); TKikimrRunner kikimr(settings); - static ui32 numKinds = 2; - auto csController = NYDBTest::TControllers::RegisterCSControllerGuard<NYDBTest::NColumnShard::TController>(); + auto csController = NYDBTest::TControllers::RegisterCSControllerGuard<NOlap::TWaitCompactionController>(); TLocalHelper(kikimr).CreateTestOlapTable(); for (ui64 i = 0; i < 100; ++i) { WriteTestData(kikimr, "/Root/olapStore/olapTable", 0, 1000000 + i * 10000, 1000); } + csController->WaitCompactions(TDuration::Seconds(10)); auto tableClient = kikimr.GetTableClient(); auto selectQuery = TString(R"( SELECT PathId, Kind, TabletId, Sum(Rows) as Rows - FROM `/Root/olapStore/.sys/store_primary_index_stats` + FROM `/Root/olapStore/.sys/store_primary_index_portion_stats` GROUP BY PathId, Kind, TabletId ORDER BY TabletId, Kind, PathId )"); auto rows = ExecuteScanQuery(tableClient, selectQuery); - UNIT_ASSERT_VALUES_EQUAL(rows.size(), numKinds*3); + UNIT_ASSERT_VALUES_EQUAL(rows.size(), 3); UNIT_ASSERT_VALUES_EQUAL(GetUint64(rows[0].at("PathId")), 3ull); + UNIT_ASSERT_VALUES_EQUAL(GetUint64(rows[1].at("PathId")), 3ull); + UNIT_ASSERT_VALUES_EQUAL(GetUint64(rows[2].at("PathId")), 3ull); UNIT_ASSERT_VALUES_EQUAL(GetUtf8(rows[0].at("Kind")), "INSERTED"); - UNIT_ASSERT_GE(GetUint64(rows[0].at("TabletId")), 72075186224037888ull); - UNIT_ASSERT_GE(GetUint64(rows[2].at("TabletId")), 72075186224037889ull); - UNIT_ASSERT_GE(GetUint64(rows[4].at("TabletId")), 72075186224037890ull); - UNIT_ASSERT_GE(GetUint64(rows[1].at("TabletId")), GetUint64(rows[0].at("TabletId"))); + UNIT_ASSERT_VALUES_EQUAL(GetUtf8(rows[1].at("Kind")), "INSERTED"); UNIT_ASSERT_VALUES_EQUAL(GetUtf8(rows[2].at("Kind")), "INSERTED"); - UNIT_ASSERT_GE(GetUint64(rows[2].at("TabletId")), GetUint64(rows[1].at("TabletId"))); - UNIT_ASSERT_VALUES_EQUAL(GetUint64(rows[3].at("PathId")), 3ull); - UNIT_ASSERT_VALUES_EQUAL(GetUtf8(rows[3].at("Kind")), "SPLIT_COMPACTED"); - UNIT_ASSERT_VALUES_EQUAL(GetUint64(rows[3].at("TabletId")), GetUint64(rows[2].at("TabletId"))); - UNIT_ASSERT_VALUES_EQUAL(GetUtf8(rows[4].at("Kind")), "INSERTED"); - UNIT_ASSERT_VALUES_EQUAL(GetUint64(rows[4].at("TabletId")), GetUint64(rows[5].at("TabletId"))); - UNIT_ASSERT_GE( - GetUint64(rows[0].at("Rows")) + GetUint64(rows[1].at("Rows")) + GetUint64(rows[2].at("Rows")) + - GetUint64(rows[3].at("Rows")) + GetUint64(rows[4].at("Rows")) + GetUint64(rows[5].at("Rows")), - 0.3*0.9*100*1000); // >= 90% of 100K inserted rows + UNIT_ASSERT_VALUES_EQUAL(GetUint64(rows[0].at("TabletId")), 72075186224037888ull); + UNIT_ASSERT_VALUES_EQUAL(GetUint64(rows[1].at("TabletId")), 72075186224037889ull); + UNIT_ASSERT_VALUES_EQUAL(GetUint64(rows[2].at("TabletId")), 72075186224037890ull); + UNIT_ASSERT_VALUES_EQUAL( + GetUint64(rows[0].at("Rows")) + GetUint64(rows[1].at("Rows")) + GetUint64(rows[2].at("Rows")), + 100 * 1000); // >= 90% of 100K inserted rows } Y_UNIT_TEST(StatsSysViewTable) { auto settings = TKikimrSettings() .SetWithSampleTables(false); TKikimrRunner kikimr(settings); - auto csController = NYDBTest::TControllers::RegisterCSControllerGuard<NYDBTest::NColumnShard::TController>(); - static ui32 numKinds = 5; + auto csController = NYDBTest::TControllers::RegisterCSControllerGuard<NOlap::TWaitCompactionController>(); TLocalHelper(kikimr).CreateTestOlapTable("olapTable_1"); TLocalHelper(kikimr).CreateTestOlapTable("olapTable_2"); for (ui64 i = 0; i < 10; ++i) { - WriteTestData(kikimr, "/Root/olapStore/olapTable_1", 0, 1000000 + i*10000, 1000); - WriteTestData(kikimr, "/Root/olapStore/olapTable_2", 0, 1000000 + i*10000, 2000); + WriteTestData(kikimr, "/Root/olapStore/olapTable_1", 0, 1000000 + i * 10000, 1000); + WriteTestData(kikimr, "/Root/olapStore/olapTable_2", 0, 1000000 + i * 10000, 2000); } + csController->WaitCompactions(TDuration::Seconds(10)); auto tableClient = kikimr.GetTableClient(); { @@ -77,8 +73,7 @@ Y_UNIT_TEST_SUITE(KqpOlapSysView) { auto rows = ExecuteScanQuery(tableClient, selectQuery); - UNIT_ASSERT_GT(rows.size(), 1*numKinds); - UNIT_ASSERT_LE(rows.size(), 3*numKinds); + UNIT_ASSERT_VALUES_EQUAL(rows.size(), 3); UNIT_ASSERT_VALUES_EQUAL(GetUint64(rows.front().at("PathId")), 3ull); UNIT_ASSERT_VALUES_EQUAL(GetUint64(rows.back().at("PathId")), 3ull); } @@ -92,8 +87,7 @@ Y_UNIT_TEST_SUITE(KqpOlapSysView) { auto rows = ExecuteScanQuery(tableClient, selectQuery); - UNIT_ASSERT_GT(rows.size(), 1*numKinds); - UNIT_ASSERT_LE(rows.size(), 3*numKinds); + UNIT_ASSERT_VALUES_EQUAL(rows.size(), 3); UNIT_ASSERT_VALUES_EQUAL(GetUint64(rows.front().at("PathId")), 4ull); UNIT_ASSERT_VALUES_EQUAL(GetUint64(rows.back().at("PathId")), 4ull); } @@ -116,8 +110,7 @@ Y_UNIT_TEST_SUITE(KqpOlapSysView) { ui64 rawBytesPK1; ui64 bytesPK1; { - auto csController = NYDBTest::TControllers::RegisterCSControllerGuard<NYDBTest::NColumnShard::TController>(); - csController->SetPeriodicWakeupActivationPeriod(TDuration::Seconds(1)); + auto csController = NYDBTest::TControllers::RegisterCSControllerGuard<NOlap::TWaitCompactionController>(); auto settings = TKikimrSettings() .SetWithSampleTables(false); TKikimrRunner kikimr(settings); @@ -125,10 +118,11 @@ Y_UNIT_TEST_SUITE(KqpOlapSysView) { TTypedLocalHelper helper("", kikimr, "olapTable", "olapStore12"); helper.CreateTestOlapTable(); helper.FillPKOnly(0, 800000); + csController->WaitCompactions(TDuration::Seconds(10)); helper.GetVolumes(rawBytesPK1, bytesPK1, false); } - auto csController = NYDBTest::TControllers::RegisterCSControllerGuard<NYDBTest::NColumnShard::TController>(); + auto csController = NYDBTest::TControllers::RegisterCSControllerGuard<NOlap::TWaitCompactionController>(); ui64 rawBytesUnpack1PK = 0; ui64 bytesUnpack1PK = 0; ui64 rawBytesPackAndUnpack2PK; @@ -144,6 +138,7 @@ Y_UNIT_TEST_SUITE(KqpOlapSysView) { helper.CreateTestOlapTable(); NArrow::NConstruction::TStringPoolFiller sPool(groupsCount, 52); helper.FillTable(sPool, 0, rowsCount); + csController->WaitCompactions(TDuration::Seconds(10)); helper.PrintCount(); { auto d = helper.GetDistribution(); @@ -152,14 +147,12 @@ Y_UNIT_TEST_SUITE(KqpOlapSysView) { Y_ABORT_UNLESS(d.GetMaxCount() - d.GetMinCount() <= 1); } helper.GetVolumes(rawBytesUnpack1PK, bytesUnpack1PK, false); - Sleep(TDuration::Seconds(5)); auto tableClient = kikimr.GetTableClient(); helper.ExecuteSchemeQuery(TStringBuilder() << "ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=ALTER_COLUMN, NAME=field, `ENCODING.DICTIONARY.ENABLED`=`true`);"); helper.ExecuteSchemeQuery("ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=ALTER_COLUMN, NAME=field1, `ENCODING.DICTIONARY.ENABLED`=`true`);", NYdb::EStatus::SCHEME_ERROR); helper.ExecuteSchemeQuery("ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=ALTER_COLUMN, NAME=field, `ENCODING.DICTIONARY.ENABLED1`=`true`);", NYdb::EStatus::GENERIC_ERROR); - Sleep(TDuration::Seconds(5)); helper.FillTable(sPool, 1, rowsCount); - Sleep(TDuration::Seconds(5)); + csController->WaitCompactions(TDuration::Seconds(10)); { helper.GetVolumes(rawBytesPackAndUnpack2PK, bytesPackAndUnpack2PK, false); helper.PrintCount(); @@ -172,6 +165,7 @@ Y_UNIT_TEST_SUITE(KqpOlapSysView) { } } helper.ExecuteSchemeQuery("ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=ALTER_COLUMN, NAME=field, `SERIALIZER.CLASS_NAME`=`ARROW_SERIALIZER`, `COMPRESSION.TYPE`=`zstd`);"); + csController->WaitCompactions(TDuration::Seconds(10)); } const ui64 rawBytesUnpack = rawBytesUnpack1PK - rawBytesPK1; const ui64 bytesUnpack = bytesUnpack1PK - bytesPK1; @@ -190,8 +184,7 @@ Y_UNIT_TEST_SUITE(KqpOlapSysView) { Y_UNIT_TEST(StatsSysViewBytesPackActualization) { ui64 rawBytesPK1; ui64 bytesPK1; - auto csController = NYDBTest::TControllers::RegisterCSControllerGuard<NYDBTest::NColumnShard::TController>(); - csController->SetPeriodicWakeupActivationPeriod(TDuration::Seconds(1)); + auto csController = NYDBTest::TControllers::RegisterCSControllerGuard<NOlap::TWaitCompactionController>(); auto settings = TKikimrSettings() .SetWithSampleTables(false); TKikimrRunner kikimr(settings); @@ -199,6 +192,8 @@ Y_UNIT_TEST_SUITE(KqpOlapSysView) { TTypedLocalHelper helper("", kikimr, "olapTable", "olapStore"); helper.CreateTestOlapTable(); helper.FillPKOnly(0, 800000); + csController->WaitCompactions(TDuration::Seconds(10)); + helper.GetVolumes(rawBytesPK1, bytesPK1, false, {"pk_int"}); auto tableClient = kikimr.GetTableClient(); { @@ -226,16 +221,16 @@ Y_UNIT_TEST_SUITE(KqpOlapSysView) { Y_UNIT_TEST(StatsSysViewBytesColumnActualization) { ui64 rawBytes1; ui64 bytes1; - auto csController = NYDBTest::TControllers::RegisterCSControllerGuard<NYDBTest::NColumnShard::TController>(); - csController->SetPeriodicWakeupActivationPeriod(TDuration::Seconds(1)); - auto settings = TKikimrSettings() - .SetWithSampleTables(false); + auto csController = NYDBTest::TControllers::RegisterCSControllerGuard<NOlap::TWaitCompactionController>(); + auto settings = TKikimrSettings().SetWithSampleTables(false); TKikimrRunner kikimr(settings); Tests::NCommon::TLoggerInit(kikimr).Initialize(); TTypedLocalHelper helper("Utf8", kikimr); helper.CreateTestOlapTable(); NArrow::NConstruction::TStringPoolFiller sPool(3, 52); helper.FillTable(sPool, 0, 800000); + csController->WaitCompactions(TDuration::Seconds(10)); + helper.GetVolumes(rawBytes1, bytes1, false, {"new_column_ui64"}); AFL_VERIFY(rawBytes1 == 0); AFL_VERIFY(bytes1 == 0); @@ -255,8 +250,7 @@ Y_UNIT_TEST_SUITE(KqpOlapSysView) { Y_UNIT_TEST(StatsSysViewBytesDictActualization) { ui64 rawBytes1; ui64 bytes1; - auto csController = NYDBTest::TControllers::RegisterCSControllerGuard<NYDBTest::NColumnShard::TController>(); - csController->SetPeriodicWakeupActivationPeriod(TDuration::Seconds(1)); + auto csController = NYDBTest::TControllers::RegisterCSControllerGuard<NOlap::TWaitCompactionController>(); auto settings = TKikimrSettings() .SetWithSampleTables(false); TKikimrRunner kikimr(settings); @@ -265,6 +259,8 @@ Y_UNIT_TEST_SUITE(KqpOlapSysView) { helper.CreateTestOlapTable(); NArrow::NConstruction::TStringPoolFiller sPool(3, 52); helper.FillTable(sPool, 0, 800000); + csController->WaitCompactions(TDuration::Seconds(10)); + helper.GetVolumes(rawBytes1, bytes1, false, {"field"}); auto tableClient = kikimr.GetTableClient(); { @@ -292,8 +288,7 @@ Y_UNIT_TEST_SUITE(KqpOlapSysView) { Y_UNIT_TEST(StatsSysViewBytesDictStatActualization) { ui64 rawBytes1; ui64 bytes1; - auto csController = NYDBTest::TControllers::RegisterCSControllerGuard<NYDBTest::NColumnShard::TController>(); - csController->SetPeriodicWakeupActivationPeriod(TDuration::Seconds(1)); + auto csController = NYDBTest::TControllers::RegisterCSControllerGuard<NOlap::TWaitCompactionController>(); auto settings = TKikimrSettings().SetWithSampleTables(false); TKikimrRunner kikimr(settings); Tests::NCommon::TLoggerInit(kikimr).Initialize(); @@ -301,6 +296,8 @@ Y_UNIT_TEST_SUITE(KqpOlapSysView) { helper.CreateTestOlapTable(); NArrow::NConstruction::TStringPoolFiller sPool(3, 52); helper.FillTable(sPool, 0, 800000); + csController->WaitCompactions(TDuration::Seconds(10)); + helper.GetVolumes(rawBytes1, bytes1, false, {"field"}); auto tableClient = kikimr.GetTableClient(); { @@ -308,52 +305,73 @@ Y_UNIT_TEST_SUITE(KqpOlapSysView) { helper.ExecuteSchemeQuery("ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_STAT, NAME=field_var, TYPE=variability, FEATURES=`{\"column_name\" : \"field\"}`);"); helper.ExecuteSchemeQuery("ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_STAT, NAME=pk_int_max, TYPE=max, FEATURES=`{\"column_name\" : \"pk_int\"}`);"); helper.ExecuteSchemeQuery("ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_OPTIONS, SCHEME_NEED_ACTUALIZATION=`true`);"); - csController->WaitActualization(TDuration::Seconds(10)); - ui64 rawBytes2; - ui64 bytes2; - helper.GetVolumes(rawBytes2, bytes2, false, {"field"}); - AFL_VERIFY(rawBytes2 == rawBytes1)("f1", rawBytes1)("f2", rawBytes2); - AFL_VERIFY(bytes2 < bytes1 * 0.5)("f1", bytes1)("f2", bytes2); - std::vector<NKikimrColumnShardStatisticsProto::TPortionStorage> stats; - helper.GetStats(stats, true); - for (auto&& i : stats) { - AFL_VERIFY(i.ScalarsSize() == 2); - AFL_VERIFY(i.GetScalars()[0].GetUint32() == 3); - } + csController->WaitCondition(TDuration::Seconds(10), [&]() { + ui64 rawBytes2; + ui64 bytes2; + helper.GetVolumes(rawBytes2, bytes2, false, {"field"}); + AFL_VERIFY(rawBytes2 == rawBytes1)("f1", rawBytes1)("f2", rawBytes2); + AFL_VERIFY(bytes2 < bytes1 * 0.5)("f1", bytes1)("f2", bytes2); + std::vector<NKikimrColumnShardStatisticsProto::TPortionStorage> stats; + helper.GetStats(stats, true); + for (auto&& i : stats) { + if (i.ScalarsSize() != 2) { + return false; + } + if (i.GetScalars()[0].GetUint32() != 3) { + return false; + } + } + return true; + } + ); } { helper.ExecuteSchemeQuery("ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=DROP_STAT, NAME=pk_int_max);"); helper.ExecuteSchemeQuery("ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_OPTIONS, SCHEME_NEED_ACTUALIZATION=`true`);"); - csController->WaitActualization(TDuration::Seconds(10)); - std::vector<NKikimrColumnShardStatisticsProto::TPortionStorage> stats; - helper.GetStats(stats, true); - for (auto&& i : stats) { - AFL_VERIFY(i.ScalarsSize() == 1); - AFL_VERIFY(i.GetScalars()[0].GetUint32() == 3); - } + csController->WaitCondition(TDuration::Seconds(10), [&]() { + std::vector<NKikimrColumnShardStatisticsProto::TPortionStorage> stats; + helper.GetStats(stats, true); + for (auto&& i : stats) { + if (i.ScalarsSize() != 1) { + return false; + } + if (i.GetScalars()[0].GetUint32() != 3) { + return false; + } + } + return true; + }); } { helper.ExecuteSchemeQuery("ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_STAT, NAME=pk_int_max, TYPE=max, FEATURES=`{\"column_name\" : \"pk_int\"}`);"); helper.ExecuteSchemeQuery("ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_OPTIONS, SCHEME_NEED_ACTUALIZATION=`true`);"); - csController->WaitActualization(TDuration::Seconds(10)); - std::vector<NKikimrColumnShardStatisticsProto::TPortionStorage> stats; - helper.GetStats(stats, true); - for (auto&& i : stats) { - AFL_VERIFY(i.ScalarsSize() == 2); - AFL_VERIFY(i.GetScalars()[0].GetUint32() == 3); - } + csController->WaitCondition(TDuration::Seconds(10), [&]() { + std::vector<NKikimrColumnShardStatisticsProto::TPortionStorage> stats; + helper.GetStats(stats, true); + for (auto&& i : stats) { + if (i.ScalarsSize() != 2) { + return false; + } + if (i.GetScalars()[0].GetUint32() != 3) { + return false; + } + } + return true; + } + ); } } Y_UNIT_TEST(StatsSysViewColumns) { auto settings = TKikimrSettings().SetWithSampleTables(false); - auto csController = NYDBTest::TControllers::RegisterCSControllerGuard<NYDBTest::NColumnShard::TController>(); + auto csController = NYDBTest::TControllers::RegisterCSControllerGuard<NOlap::TWaitCompactionController>(); TKikimrRunner kikimr(settings); TLocalHelper(kikimr.GetTestServer()).CreateTestOlapTable(); for (ui64 i = 0; i < 10; ++i) { - WriteTestData(kikimr, "/Root/olapStore/olapTable", 0, 1000000 + i*10000, 2000); + WriteTestData(kikimr, "/Root/olapStore/olapTable", 0, 1000000 + i * 10000, 2000); } + csController->WaitCompactions(TDuration::Seconds(10)); auto tableClient = kikimr.GetTableClient(); @@ -388,16 +406,15 @@ Y_UNIT_TEST_SUITE(KqpOlapSysView) { } { auto selectQuery = TString(R"( - SELECT Sum(Rows) as Rows, Kind, Sum(RawBytes) as RawBytes, Sum(Rows) as Rows2, Sum(Rows) as Rows3, PathId - FROM `/Root/olapStore/.sys/store_primary_index_stats` + SELECT Sum(Rows) as Rows, Kind, Sum(ColumnRawBytes) as RawBytes, PathId + FROM `/Root/olapStore/.sys/store_primary_index_portion_stats` GROUP BY Kind, PathId - ORDER BY PathId, Kind, Rows3 + ORDER BY PathId, Kind, Rows )"); auto rows = ExecuteScanQuery(tableClient, selectQuery); - UNIT_ASSERT_VALUES_EQUAL(rows.size(), 2); - UNIT_ASSERT_VALUES_EQUAL(GetUint64(rows[0].at("Rows2")), GetUint64(rows[0].at("Rows3"))); - UNIT_ASSERT_VALUES_EQUAL(GetUint64(rows[1].at("Rows")), GetUint64(rows[1].at("Rows3"))); + UNIT_ASSERT_VALUES_EQUAL(rows.size(), 1); + UNIT_ASSERT_VALUES_EQUAL(GetUint64(rows[0].at("Rows")), 20000); } } @@ -413,10 +430,11 @@ Y_UNIT_TEST_SUITE(KqpOlapSysView) { TLocalHelper(kikimr).CreateTestOlapTable("olapTable_3"); for (ui64 i = 0; i < 10; ++i) { - WriteTestData(kikimr, "/Root/olapStore/olapTable_1", 0, 1000000 + i*10000, 2000); - WriteTestData(kikimr, "/Root/olapStore/olapTable_2", 0, 1000000 + i*10000, 3000); - WriteTestData(kikimr, "/Root/olapStore/olapTable_3", 0, 1000000 + i*10000, 5000); + WriteTestData(kikimr, "/Root/olapStore/olapTable_1", 0, 1000000 + i * 10000, 2000); + WriteTestData(kikimr, "/Root/olapStore/olapTable_2", 0, 1000000 + i * 10000, 3000); + WriteTestData(kikimr, "/Root/olapStore/olapTable_3", 0, 1000000 + i * 10000, 5000); } + csController->WaitCompactions(TDuration::Seconds(10)); auto tableClient = kikimr.GetTableClient(); @@ -461,12 +479,12 @@ Y_UNIT_TEST_SUITE(KqpOlapSysView) { auto rows = ExecuteScanQuery(tableClient, selectQuery); - ui32 numExpected = 3*3; + ui32 numExpected = 3 * 3; UNIT_ASSERT_VALUES_EQUAL(rows.size(), numExpected); UNIT_ASSERT_VALUES_EQUAL(GetUint64(rows[0].at("PathId")), 5ull); UNIT_ASSERT_VALUES_EQUAL(GetUtf8(rows[0].at("Kind")), "INSERTED"); - UNIT_ASSERT_VALUES_EQUAL(GetUint64(rows[numExpected-1].at("PathId")), 3ull); - UNIT_ASSERT_VALUES_EQUAL(GetUtf8(rows[numExpected-1].at("Kind")), "INSERTED"); + UNIT_ASSERT_VALUES_EQUAL(GetUint64(rows[numExpected - 1].at("PathId")), 3ull); + UNIT_ASSERT_VALUES_EQUAL(GetUtf8(rows[numExpected - 1].at("Kind")), "INSERTED"); } { @@ -484,24 +502,25 @@ Y_UNIT_TEST_SUITE(KqpOlapSysView) { auto rows = ExecuteScanQuery(tableClient, selectQuery); - ui32 numExpected = 2*3; + ui32 numExpected = 2 * 3; UNIT_ASSERT_VALUES_EQUAL(rows.size(), numExpected); UNIT_ASSERT_VALUES_EQUAL(GetUint64(rows[0].at("PathId")), 5ull); UNIT_ASSERT_VALUES_EQUAL(GetUtf8(rows[0].at("Kind")), "INSERTED"); - UNIT_ASSERT_VALUES_EQUAL(GetUint64(rows[numExpected-1].at("PathId")), 3ull); - UNIT_ASSERT_VALUES_EQUAL(GetUtf8(rows[numExpected-1].at("Kind")), "INSERTED"); + UNIT_ASSERT_VALUES_EQUAL(GetUint64(rows[numExpected - 1].at("PathId")), 3ull); + UNIT_ASSERT_VALUES_EQUAL(GetUtf8(rows[numExpected - 1].at("Kind")), "INSERTED"); } } Y_UNIT_TEST(StatsSysViewFilter) { auto settings = TKikimrSettings().SetWithSampleTables(false); TKikimrRunner kikimr(settings); - auto csController = NYDBTest::TControllers::RegisterCSControllerGuard<NYDBTest::NColumnShard::TController>(); + auto csController = NYDBTest::TControllers::RegisterCSControllerGuard<NOlap::TWaitCompactionController>(); TLocalHelper(kikimr.GetTestServer()).CreateTestOlapTable(); for (ui64 i = 0; i < 10; ++i) { - WriteTestData(kikimr, "/Root/olapStore/olapTable", 0, 1000000 + i*10000, 2000); + WriteTestData(kikimr, "/Root/olapStore/olapTable", 0, 1000000 + i * 10000, 2000); } + csController->WaitCompactions(TDuration::Seconds(10)); auto tableClient = kikimr.GetTableClient(); @@ -548,31 +567,32 @@ Y_UNIT_TEST_SUITE(KqpOlapSysView) { auto selectQuery = TString(R"( SELECT PathId, Kind, TabletId FROM `/Root/olapStore/.sys/store_primary_index_stats` - WHERE Kind IN ('SPLIT_COMPACTED', 'INACTIVE', 'EVICTED') + WHERE Kind IN ('SPLIT_COMPACTED', 'INACTIVE', 'EVICTED', 'INSERTED') GROUP BY PathId, Kind, TabletId ORDER BY PathId, Kind, TabletId; )"); auto rows = ExecuteScanQuery(tableClient, selectQuery); - UNIT_ASSERT_GE(rows.size(), 3); + UNIT_ASSERT_VALUES_EQUAL(rows.size(), 3); } } Y_UNIT_TEST(StatsSysViewAggregation) { auto settings = TKikimrSettings().SetWithSampleTables(false); TKikimrRunner kikimr(settings); - auto csController = NYDBTest::TControllers::RegisterCSControllerGuard<NYDBTest::NColumnShard::TController>(); + auto csController = NYDBTest::TControllers::RegisterCSControllerGuard<NOlap::TWaitCompactionController>(); TLocalHelper(kikimr.GetTestServer()).CreateTestOlapTable("olapTable_1"); TLocalHelper(kikimr.GetTestServer()).CreateTestOlapTable("olapTable_2"); TLocalHelper(kikimr.GetTestServer()).CreateTestOlapTable("olapTable_3"); for (ui64 i = 0; i < 100; ++i) { - WriteTestData(kikimr, "/Root/olapStore/olapTable_1", 0, 1000000 + i*10000, 1000); - WriteTestData(kikimr, "/Root/olapStore/olapTable_2", 0, 1000000 + i*10000, 2000); - WriteTestData(kikimr, "/Root/olapStore/olapTable_3", 0, 1000000 + i*10000, 3000); + WriteTestData(kikimr, "/Root/olapStore/olapTable_1", 0, 1000000 + i * 10000, 1000); + WriteTestData(kikimr, "/Root/olapStore/olapTable_2", 0, 1000000 + i * 10000, 2000); + WriteTestData(kikimr, "/Root/olapStore/olapTable_3", 0, 1000000 + i * 10000, 3000); } + csController->WaitCompactions(TDuration::Seconds(10)); Tests::NCommon::TLoggerInit(kikimr).Initialize(); @@ -684,22 +704,22 @@ Y_UNIT_TEST_SUITE(KqpOlapSysView) { auto rows = ExecuteScanQuery(tableClient, selectQuery); // 3 Tables with 3 Shards each and 2 KindId-s of stats - UNIT_ASSERT_VALUES_EQUAL(rows.size(), 3 * 3 * 2); + UNIT_ASSERT_VALUES_EQUAL(rows.size(), 3 * 3); } { auto selectQuery = TString(R"( SELECT - count(distinct(PathId)), - count(distinct(Kind)), - count(distinct(TabletId)) + count(distinct(PathId)) as PathsCount, + count(distinct(Kind)) as KindsCount, + count(distinct(TabletId)) as TabletsCount FROM `/Root/olapStore/.sys/store_primary_index_stats` )"); auto rows = ExecuteScanQuery(tableClient, selectQuery); - UNIT_ASSERT_VALUES_EQUAL(GetUint64(rows[0].at("column0")), 3ull); - UNIT_ASSERT_VALUES_EQUAL(GetUint64(rows[0].at("column1")), 2); - UNIT_ASSERT_GE(GetUint64(rows[0].at("column2")), 3ull); + UNIT_ASSERT_VALUES_EQUAL(GetUint64(rows[0].at("PathsCount")), 3ull); + UNIT_ASSERT_VALUES_EQUAL(GetUint64(rows[0].at("KindsCount")), 1); + UNIT_ASSERT_VALUES_EQUAL(GetUint64(rows[0].at("TabletsCount")), 4); } { diff --git a/ydb/core/kqp/ut/olap/ya.make b/ydb/core/kqp/ut/olap/ya.make index f359312ebdb..352b7ac78f7 100644 --- a/ydb/core/kqp/ut/olap/ya.make +++ b/ydb/core/kqp/ut/olap/ya.make @@ -29,6 +29,8 @@ PEERDIR( ydb/core/kqp/ut/common ydb/library/yql/sql/pg_dummy ydb/core/tx/columnshard/hooks/testing + ydb/core/tx/columnshard/test_helper + ydb/core/tx/columnshard ydb/core/kqp/ut/olap/helpers ydb/core/tx/datashard/ut_common ) diff --git a/ydb/core/kqp/ut/scheme/kqp_constraints_ut.cpp b/ydb/core/kqp/ut/scheme/kqp_constraints_ut.cpp index 46e25ebdad9..0138de4bf61 100644 --- a/ydb/core/kqp/ut/scheme/kqp_constraints_ut.cpp +++ b/ydb/core/kqp/ut/scheme/kqp_constraints_ut.cpp @@ -1,7 +1,8 @@ #include <ydb/core/kqp/ut/common/kqp_ut_common.h> #include <ydb/core/formats/arrow/arrow_helpers.h> +#include <ydb/core/tx/columnshard/test_helper/columnshard_ut_common.h> +#include <ydb/core/tx/datashard/datashard.h> #include <ydb/core/tx/tx_proxy/proxy.h> -#include <ydb/core/tx/columnshard/columnshard_ut_common.h> #include <ydb/public/sdk/cpp/client/ydb_proto/accessor.h> #include <ydb/public/sdk/cpp/client/ydb_scheme/scheme.h> #include <ydb/public/sdk/cpp/client/ydb_topic/topic.h> diff --git a/ydb/core/tx/columnshard/background_controller.h b/ydb/core/tx/columnshard/background_controller.h index b88e1be3bce..bb38f274406 100644 --- a/ydb/core/tx/columnshard/background_controller.h +++ b/ydb/core/tx/columnshard/background_controller.h @@ -17,6 +17,7 @@ private: bool ActiveCleanupPortions = false; bool ActiveCleanupTables = false; + bool ActiveCleanupInsertTable = false; YDB_READONLY(TMonotonic, LastIndexationInstant, TMonotonic::Zero()); public: THashSet<NOlap::TPortionAddress> GetConflictTTLPortions() const; @@ -66,6 +67,18 @@ public: bool IsCleanupTablesActive() const { return ActiveCleanupTables; } + + void StartCleanupInsertTable() { + Y_ABORT_UNLESS(!ActiveCleanupInsertTable); + ActiveCleanupInsertTable = true; + } + void FinishCleanupInsertTable() { + Y_ABORT_UNLESS(ActiveCleanupInsertTable); + ActiveCleanupInsertTable = false; + } + bool IsCleanupInsertTableActive() const { + return ActiveCleanupInsertTable; + } }; } diff --git a/ydb/core/tx/columnshard/blobs_action/transaction/tx_gc_insert_table.cpp b/ydb/core/tx/columnshard/blobs_action/transaction/tx_gc_insert_table.cpp index 1571564621d..20507a0c44f 100644 --- a/ydb/core/tx/columnshard/blobs_action/transaction/tx_gc_insert_table.cpp +++ b/ydb/core/tx/columnshard/blobs_action/transaction/tx_gc_insert_table.cpp @@ -30,6 +30,7 @@ void TTxInsertTableCleanup::Complete(const TActorContext& /*ctx*/) { Y_ABORT_UNLESS(BlobsAction); BlobsAction->OnCompleteTxAfterRemoving(*Self, true); + Self->BackgroundController.FinishCleanupInsertTable(); Self->EnqueueBackgroundActivities(); } diff --git a/ydb/core/tx/columnshard/blobs_action/transaction/tx_write.cpp b/ydb/core/tx/columnshard/blobs_action/transaction/tx_write.cpp index f4bd0b23037..7b9a74b7885 100644 --- a/ydb/core/tx/columnshard/blobs_action/transaction/tx_write.cpp +++ b/ydb/core/tx/columnshard/blobs_action/transaction/tx_write.cpp @@ -130,6 +130,7 @@ void TTxWrite::Complete(const TActorContext& ctx) { const auto& writeMeta = buffer.GetAggregations()[i]->GetWriteData()->GetWriteMeta(); auto operation = Self->OperationsManager->GetOperation((TWriteId)writeMeta.GetWriteId()); if (operation) { + CompleteTransaction(operation->GetLockId(), ctx); ctx.Send(writeMeta.GetSource(), Results[i].release(), 0, operation->GetCookie()); } else { ctx.Send(writeMeta.GetSource(), Results[i].release()); diff --git a/ydb/core/tx/columnshard/columnshard__progress_tx.cpp b/ydb/core/tx/columnshard/columnshard__progress_tx.cpp index 7d0478aa759..6db95153b6d 100644 --- a/ydb/core/tx/columnshard/columnshard__progress_tx.cpp +++ b/ydb/core/tx/columnshard/columnshard__progress_tx.cpp @@ -41,14 +41,8 @@ public: } TxOperator = Self->ProgressTxController->GetVerifiedTxOperator(txId); - AFL_VERIFY(TxOperator->Progress(*Self, NOlap::TSnapshot(step, txId), txc)); + AFL_VERIFY(TxOperator->ExecuteOnProgress(*Self, NOlap::TSnapshot(step, txId), txc)); Self->ProgressTxController->FinishPlannedTx(txId, txc); - Self->RescheduleWaitingReads(); - } - - Self->ProgressTxInFlight = false; - if (!!Self->ProgressTxController->GetPlannedTx()) { - Self->EnqueueProgressTx(ctx); } return true; } @@ -56,7 +50,8 @@ public: void Complete(const TActorContext& ctx) override { NActors::TLogContextGuard logGuard = NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("tablet_id", Self->TabletID())("tx_state", "complete"); if (TxOperator) { - TxOperator->Complete(*Self, ctx); + TxOperator->CompleteOnProgress(*Self, ctx); + Self->RescheduleWaitingReads(); } if (PlannedQueueItem) { Self->GetProgressTxController().CompleteRunningTx(*PlannedQueueItem); @@ -64,11 +59,15 @@ public: if (LastCompletedTx) { Self->LastCompletedTx = std::max(*LastCompletedTx, Self->LastCompletedTx); } - Self->SetupIndexation(); + Self->ProgressTxInFlight = false; + if (!!Self->ProgressTxController->GetPlannedTx()) { + Self->EnqueueProgressTx(ctx); + } + Self->EnqueueBackgroundActivities(false); } private: - TTxController::ITransactionOperatior::TPtr TxOperator; + TTxController::ITransactionOperator::TPtr TxOperator; const ui32 TabletTxNo; std::optional<NOlap::TSnapshot> LastCompletedTx; std::optional<TTxController::TPlanQueueItem> PlannedQueueItem; diff --git a/ydb/core/tx/columnshard/columnshard__propose_cancel.cpp b/ydb/core/tx/columnshard/columnshard__propose_cancel.cpp index 9b9ed721cc0..d3da820725a 100644 --- a/ydb/core/tx/columnshard/columnshard__propose_cancel.cpp +++ b/ydb/core/tx/columnshard/columnshard__propose_cancel.cpp @@ -19,12 +19,15 @@ public: const auto* msg = Ev->Get(); const ui64 txId = msg->Record.GetTxId(); - Self->ProgressTxController->CancelTx(txId, txc); + Self->ProgressTxController->ExecuteOnCancel(txId, txc); return true; } - void Complete(const TActorContext&) override { + void Complete(const TActorContext& ctx) override { LOG_S_DEBUG("TTxProposeCancel.Complete"); + const auto* msg = Ev->Get(); + const ui64 txId = msg->Record.GetTxId(); + Self->ProgressTxController->CompleteOnCancel(txId, ctx); } private: diff --git a/ydb/core/tx/columnshard/columnshard__propose_transaction.cpp b/ydb/core/tx/columnshard/columnshard__propose_transaction.cpp index 3688e74126e..9c0265ceb6d 100644 --- a/ydb/core/tx/columnshard/columnshard__propose_transaction.cpp +++ b/ydb/core/tx/columnshard/columnshard__propose_transaction.cpp @@ -132,8 +132,12 @@ void TTxProposeTransaction::OnProposeResult(TTxController::TProposeResult& propo } void TTxProposeTransaction::Complete(const TActorContext& ctx) { + auto& record = Proto(Ev->Get()); + const ui64 txId = record.GetTxId(); + Y_ABORT_UNLESS(Ev); Y_ABORT_UNLESS(Result); + CompleteTransaction(txId, ctx); ctx.Send(Ev->Get()->GetSource(), Result.release()); Self->TryRegisterMediatorTimeCast(); } diff --git a/ydb/core/tx/columnshard/columnshard_impl.cpp b/ydb/core/tx/columnshard/columnshard_impl.cpp index b04af261d98..14d35d75704 100644 --- a/ydb/core/tx/columnshard/columnshard_impl.cpp +++ b/ydb/core/tx/columnshard/columnshard_impl.cpp @@ -521,7 +521,7 @@ void TColumnShard::EnqueueBackgroundActivities(const bool periodic) { SendPeriodicStats(); if (!TablesManager.HasPrimaryIndex()) { - LOG_S_NOTICE("Background activities cannot be started: no index at tablet " << TabletID()); + AFL_NOTICE(NKikimrServices::TX_COLUMNSHARD)("problem", "Background activities cannot be started: no index at tablet"); return; } // !!!!!! MUST BE FIRST THROUGH DATA HAVE TO BE SAME IN SESSIONS AFTER TABLET RESTART @@ -855,11 +855,16 @@ void TColumnShard::Handle(TEvPrivate::TEvGarbageCollectionFinished::TPtr& ev, co void TColumnShard::SetupCleanupInsertTable() { auto writeIdsToCleanup = InsertTable->OldWritesToAbort(AppData()->TimeProvider->Now()); + if (BackgroundController.IsCleanupInsertTableActive()) { + ACFL_DEBUG("background", "cleanup_insert_table")("skip_reason", "in_progress"); + return; + } + if (!InsertTable->GetAborted().size() && !writeIdsToCleanup.size()) { return; } AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("event", "cleanup_started")("aborted", InsertTable->GetAborted().size())("to_cleanup", writeIdsToCleanup.size()); - + BackgroundController.StartCleanupInsertTable(); Execute(new TTxInsertTableCleanup(this, std::move(writeIdsToCleanup)), TActorContext::AsActorContext()); } diff --git a/ydb/core/tx/columnshard/engines/storage/granule.h b/ydb/core/tx/columnshard/engines/storage/granule.h index 1002139fbf3..35857032f8f 100644 --- a/ydb/core/tx/columnshard/engines/storage/granule.h +++ b/ydb/core/tx/columnshard/engines/storage/granule.h @@ -5,6 +5,7 @@ #include <ydb/core/tx/columnshard/counters/engine_logs.h> #include <ydb/core/tx/columnshard/engines/column_engine.h> #include <ydb/core/tx/columnshard/engines/portion_info.h> +#include <ydb/core/tx/columnshard/hooks/abstract/abstract.h> #include <ydb/core/base/appdata.h> #include <ydb/core/formats/arrow/reader/position.h> @@ -244,7 +245,7 @@ public: } void ActualizeOptimizer(const TInstant currentInstant) const { - if (currentInstant - OptimizerPlanner->GetActualizationInstant() > TDuration::Seconds(1)) { + if (currentInstant - OptimizerPlanner->GetActualizationInstant() >= NYDBTest::TControllers::GetColumnShardController()->GetCompactionActualizationLag(TDuration::Seconds(1))) { OptimizerPlanner->Actualize(currentInstant); } } diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/lbuckets/optimizer.h b/ydb/core/tx/columnshard/engines/storage/optimizer/lbuckets/optimizer.h index 148dd5da4a2..40f44d6e51a 100644 --- a/ydb/core/tx/columnshard/engines/storage/optimizer/lbuckets/optimizer.h +++ b/ydb/core/tx/columnshard/engines/storage/optimizer/lbuckets/optimizer.h @@ -581,15 +581,15 @@ public: return 0; } */ - - const ui64 count = BucketInfo.GetCount() + ((mainPortion && !isFinal) ? 1 : 0); + const bool isForce = NYDBTest::TControllers::GetColumnShardController()->GetCompactionControl() == NYDBTest::EOptimizerCompactionWeightControl::Force; + const ui64 count = BucketInfo.GetCount() + ((mainPortion && (!isFinal || isForce)) ? 1 : 0); const ui64 recordsCount = BucketInfo.GetRecordsCount() + ((mainPortion && !isFinal) ? mainPortion->GetRecordsCount() : 0); const ui64 sumBytes = BucketInfo.GetBytes() + ((mainPortion && !isFinal) ? mainPortion->GetTotalBlobBytes() : 0); if (NYDBTest::TControllers::GetColumnShardController()->GetCompactionControl() == NYDBTest::EOptimizerCompactionWeightControl::Disable) { return 0; } const ui64 weight = (10000000000.0 * count - sumBytes) * (isFinal ? 1 : 10); - if (NYDBTest::TControllers::GetColumnShardController()->GetCompactionControl() == NYDBTest::EOptimizerCompactionWeightControl::Force) { + if (isForce) { return (count > 1) ? weight : 0; } @@ -799,10 +799,14 @@ public: } } else { if (MainPortion) { - for (auto&& i : portions) { - if (MainPortion->CrossPKWith(*i)) { - portions.emplace_back(MainPortion); - break; + if (portions.size() == 1) { + portions.emplace_back(MainPortion); + } else { + for (auto&& i : portions) { + if (MainPortion->CrossPKWith(*i)) { + portions.emplace_back(MainPortion); + break; + } } } } @@ -811,7 +815,7 @@ public: stopInstant = Others.GetFutureStartInstant(); } } - AFL_VERIFY(portions.size() > 1); + AFL_VERIFY(portions.size() > 1)("size", portions.size()); ui64 size = 0; for (auto&& i : portions) { size += i->GetTotalBlobBytes(); diff --git a/ydb/core/tx/columnshard/engines/ut/ut_logs_engine.cpp b/ydb/core/tx/columnshard/engines/ut/ut_logs_engine.cpp index 9a20c83958c..fc4607fb623 100644 --- a/ydb/core/tx/columnshard/engines/ut/ut_logs_engine.cpp +++ b/ydb/core/tx/columnshard/engines/ut/ut_logs_engine.cpp @@ -6,7 +6,7 @@ #include <ydb/core/tx/columnshard/engines/changes/indexation.h> #include <ydb/core/tx/columnshard/engines/changes/ttl.h> -#include <ydb/core/tx/columnshard/columnshard_ut_common.h> +#include <ydb/core/tx/columnshard/test_helper/columnshard_ut_common.h> #include <ydb/core/tx/columnshard/engines/changes/compaction.h> #include <ydb/core/tx/columnshard/engines/portions/write_with_blobs.h> #include <ydb/core/tx/columnshard/blobs_action/bs/storage.h> @@ -16,6 +16,8 @@ #include <ydb/core/tx/columnshard/background_controller.h> #include <ydb/core/tx/columnshard/engines/changes/abstract/abstract.h> #include <ydb/core/tx/columnshard/test_helper/helper.h> +#include <ydb/core/tx/columnshard/engines/insert_table/insert_table.h> +#include <ydb/core/tx/columnshard/columnshard_schema.h> namespace NKikimr { diff --git a/ydb/core/tx/columnshard/engines/ut/ut_program.cpp b/ydb/core/tx/columnshard/engines/ut/ut_program.cpp index a4586c48946..798ba6ec505 100644 --- a/ydb/core/tx/columnshard/engines/ut/ut_program.cpp +++ b/ydb/core/tx/columnshard/engines/ut/ut_program.cpp @@ -1,7 +1,7 @@ #include <ydb/core/tx/columnshard/engines/index_info.h> #include <ydb/core/tx/columnshard/engines/reader/plain_reader/constructor/resolver.h> -#include <ydb/core/tx/columnshard/columnshard_ut_common.h> +#include <ydb/core/tx/columnshard/test_helper/columnshard_ut_common.h> #include <ydb/core/tx/columnshard/test_helper/helper.h> #include <ydb/core/tx/program/program.h> #include <ydb/core/formats/arrow/converter.h> diff --git a/ydb/core/tx/columnshard/engines/ut/ya.make b/ydb/core/tx/columnshard/engines/ut/ya.make index 41a7d7b2aac..44ab1190e1e 100644 --- a/ydb/core/tx/columnshard/engines/ut/ya.make +++ b/ydb/core/tx/columnshard/engines/ut/ya.make @@ -37,7 +37,6 @@ SRCS( ut_logs_engine.cpp ut_program.cpp helper.cpp - ydb/core/tx/columnshard/columnshard_ut_common.cpp ) END() diff --git a/ydb/core/tx/columnshard/hooks/abstract/abstract.h b/ydb/core/tx/columnshard/hooks/abstract/abstract.h index 7a64cc5a816..b1c13dbe04b 100644 --- a/ydb/core/tx/columnshard/hooks/abstract/abstract.h +++ b/ydb/core/tx/columnshard/hooks/abstract/abstract.h @@ -88,6 +88,10 @@ public: using TPtr = std::shared_ptr<ICSController>; virtual ~ICSController() = default; + virtual TDuration GetCompactionActualizationLag(const TDuration def) const { + return def; + } + virtual NColumnShard::TBlobPutResult::TPtr OverrideBlobPutResultOnCompaction(const NColumnShard::TBlobPutResult::TPtr original, const NOlap::TWriteActionsCollection& /*actions*/) const { return original; } diff --git a/ydb/core/tx/columnshard/hooks/testing/controller.h b/ydb/core/tx/columnshard/hooks/testing/controller.h index 9ba5865cdcd..1fcd157118c 100644 --- a/ydb/core/tx/columnshard/hooks/testing/controller.h +++ b/ydb/core/tx/columnshard/hooks/testing/controller.h @@ -206,6 +206,62 @@ protected: } public: + void WaitCompactions(const TDuration d) const { + TInstant start = TInstant::Now(); + ui32 compactionsStart = GetCompactionStartedCounter().Val(); + while (Now() - start < d) { + if (compactionsStart != GetCompactionStartedCounter().Val()) { + compactionsStart = GetCompactionStartedCounter().Val(); + start = TInstant::Now(); + } + Cerr << "WAIT_COMPACTION: " << GetCompactionStartedCounter().Val() << Endl; + Sleep(TDuration::Seconds(1)); + } + } + + void WaitIndexation(const TDuration d) const { + TInstant start = TInstant::Now(); + ui32 compactionsStart = GetInsertStartedCounter().Val(); + while (Now() - start < d) { + if (compactionsStart != GetInsertStartedCounter().Val()) { + compactionsStart = GetInsertStartedCounter().Val(); + start = TInstant::Now(); + } + Cerr << "WAIT_INDEXATION: " << GetInsertStartedCounter().Val() << Endl; + Sleep(TDuration::Seconds(1)); + } + } + + template <class TTester> + void WaitCondition(const TDuration d, const TTester& test) const { + const TInstant start = TInstant::Now(); + while (TInstant::Now() - start < d) { + if (test()) { + Cerr << "condition SUCCESS!!..." << TInstant::Now() - start << Endl; + return; + } else { + Cerr << "waiting condition..." << TInstant::Now() - start << Endl; + Sleep(TDuration::Seconds(1)); + } + } + AFL_VERIFY(false)("reason", "condition not reached"); + } + + void WaitActualization(const TDuration d) const { + TInstant start = TInstant::Now(); + const i64 startVal = NeedActualizationCount.Val(); + i64 predVal = NeedActualizationCount.Val(); + while (TInstant::Now() - start < d && (!startVal || NeedActualizationCount.Val())) { + Cerr << "waiting actualization: " << NeedActualizationCount.Val() << "/" << TInstant::Now() - start << Endl; + if (NeedActualizationCount.Val() != predVal) { + predVal = NeedActualizationCount.Val(); + start = TInstant::Now(); + } + Sleep(TDuration::Seconds(1)); + } + AFL_VERIFY(!NeedActualizationCount.Val()); + } + virtual TDuration GetRemovedPortionLivetime(const TDuration /*def*/) const override { return TDuration::Zero(); } @@ -240,15 +296,6 @@ public: DisabledBackgrounds.erase(id); } - void WaitActualization(const TDuration d) const { - const TInstant start = TInstant::Now(); - while (TInstant::Now() - start < d && NeedActualizationCount.Val()) { - Cerr << "waiting actualization: " << NeedActualizationCount.Val() << "/" << TInstant::Now() - start << Endl; - Sleep(TDuration::Seconds(1)); - } - AFL_VERIFY(!NeedActualizationCount.Val()); - } - std::vector<ui64> GetShardActualIds() const { TGuard<TMutex> g(Mutex); std::vector<ui64> result; diff --git a/ydb/core/tx/columnshard/columnshard_ut_common.cpp b/ydb/core/tx/columnshard/test_helper/columnshard_ut_common.cpp index 8c532e01223..a37342c4f72 100644 --- a/ydb/core/tx/columnshard/columnshard_ut_common.cpp +++ b/ydb/core/tx/columnshard/test_helper/columnshard_ut_common.cpp @@ -1,7 +1,7 @@ #include "columnshard_ut_common.h" -#include "common/tests/shard_reader.h" -#include "engines/reader/sys_view/chunks/chunks.h" +#include <ydb/core/tx/columnshard/common/tests/shard_reader.h> +#include <ydb/core/tx/columnshard/engines/reader/sys_view/portions/portions.h> #include <ydb/core/base/tablet.h> #include <ydb/core/base/tablet_resolver.h> @@ -170,11 +170,11 @@ void ScanIndexStats(TTestBasicRuntime& runtime, TActorId& sender, const std::vec record.SetTxId(snap.GetPlanStep()); record.SetScanId(scanId); // record.SetLocalPathId(0); - record.SetTablePath(NOlap::TIndexInfo::STORE_INDEX_STATS_TABLE); + record.SetTablePath(NOlap::TIndexInfo::STORE_INDEX_PORTION_STATS_TABLE); // Schema: pathId, kind, rows, bytes, rawBytes. PK: {pathId, kind} //record.SetSchemaVersion(0); - auto ydbSchema = NOlap::NReader::NSysView::NChunks::TStatsIterator::StatsSchema; + auto ydbSchema = NOlap::NReader::NSysView::NPortions::TStatsIterator::StatsSchema; for (const auto& col : ydbSchema.Columns) { record.AddColumnTags(col.second.Id); auto columnType = NScheme::ProtoColumnTypeFromTypeInfoMod(col.second.PType, col.second.PTypeMod); @@ -224,6 +224,11 @@ void PlanCommit(TTestBasicRuntime& runtime, TActorId& sender, ui64 planStep, con PlanCommit(runtime, sender, TTestTxConfig::TxTablet0, planStep, txIds); } +void Wakeup(TTestBasicRuntime& runtime, TActorId& sender, const ui64 shardId) { + auto wakeup = std::make_unique<TEvPrivate::TEvPeriodicWakeup>(true); + ForwardToTablet(runtime, shardId, sender, wakeup.release()); +} + void PlanCommit(TTestBasicRuntime& runtime, TActorId& sender, ui64 shardId, ui64 planStep, const TSet<ui64>& txIds) { auto plan = std::make_unique<TEvTxProcessing::TEvPlanStep>(planStep, 0, shardId); for (ui64 txId : txIds) { @@ -243,6 +248,7 @@ void PlanCommit(TTestBasicRuntime& runtime, TActorId& sender, ui64 shardId, ui64 UNIT_ASSERT(txIds.contains(res.GetTxId())); UNIT_ASSERT_EQUAL(res.GetStatus(), NKikimrTxColumnShard::EResultStatus::SUCCESS); } + Wakeup(runtime, sender, shardId); } TCell MakeTestCell(const TTypeInfo& typeInfo, ui32 value, std::vector<TString>& mem) { diff --git a/ydb/core/tx/columnshard/columnshard_ut_common.h b/ydb/core/tx/columnshard/test_helper/columnshard_ut_common.h index 8feec369ea2..c696d3b940c 100644 --- a/ydb/core/tx/columnshard/columnshard_ut_common.h +++ b/ydb/core/tx/columnshard/test_helper/columnshard_ut_common.h @@ -1,9 +1,8 @@ #pragma once -#include "columnshard.h" -#include "columnshard_impl.h" -#include "blob_cache.h" -#include "engines/scheme/statistics/max/operator.h" +#include <ydb/core/tx/columnshard/blob_cache.h> +#include <ydb/core/tx/columnshard/engines/scheme/statistics/max/operator.h> +#include <ydb/core/tx/columnshard/common/snapshot.h> #include <ydb/core/formats/arrow/arrow_batch_builder.h> #include <ydb/core/tx/columnshard/test_helper/helper.h> @@ -11,7 +10,15 @@ #include <ydb/core/scheme/scheme_types_proto.h> #include <ydb/core/testlib/tablet_helpers.h> #include <ydb/core/testlib/test_client.h> +#include <ydb/core/protos/tx_columnshard.pb.h> +#include <ydb/services/metadata/abstract/fetcher.h> + #include <library/cpp/testing/unittest/registar.h> +#include <ydb/core/tx/long_tx_service/public/types.h> + +namespace NKikimr::NOlap { +struct TIndexInfo; +} namespace NKikimr::NTxUT { @@ -449,6 +456,8 @@ inline void PlanCommit(TTestBasicRuntime& runtime, TActorId& sender, ui64 planSt PlanCommit(runtime, sender, planStep, ids); } +void Wakeup(TTestBasicRuntime& runtime, TActorId& sender, const ui64 shardId); + struct TTestBlobOptions { THashSet<TString> NullColumns; THashSet<TString> SameValueColumns; diff --git a/ydb/core/tx/columnshard/test_helper/controllers.cpp b/ydb/core/tx/columnshard/test_helper/controllers.cpp index d9ee86446c6..997a700d901 100644 --- a/ydb/core/tx/columnshard/test_helper/controllers.cpp +++ b/ydb/core/tx/columnshard/test_helper/controllers.cpp @@ -1,7 +1,7 @@ +#include "columnshard_ut_common.h" #include "controllers.h" #include <ydb/core/tx/columnshard/engines/changes/ttl.h> #include <ydb/core/tx/columnshard/engines/changes/indexation.h> -#include <ydb/core/tx/columnshard/columnshard_ut_common.h> namespace NKikimr::NOlap { diff --git a/ydb/core/tx/columnshard/test_helper/controllers.h b/ydb/core/tx/columnshard/test_helper/controllers.h index 682f2dcacaf..b18c2bc34e8 100644 --- a/ydb/core/tx/columnshard/test_helper/controllers.h +++ b/ydb/core/tx/columnshard/test_helper/controllers.h @@ -12,6 +12,7 @@ private: ui32 TiersModificationsCount = 0; YDB_READONLY(TAtomicCounter, StatisticsUsageCount, 0); YDB_READONLY(TAtomicCounter, MaxValueUsageCount, 0); + YDB_ACCESSOR_DEF(std::optional<ui64>, SmallSizeDetector); protected: virtual void OnTieringModified(const std::shared_ptr<NKikimr::NColumnShard::TTiersManager>& /*tiers*/) override; virtual void OnExportFinished() override { @@ -21,7 +22,7 @@ protected: return true; } virtual ui64 GetSmallPortionSizeDetector(const ui64 /*def*/) const override { - return 0; + return SmallSizeDetector.value_or(0); } virtual TDuration GetOptimizerFreshnessCheckDuration(const TDuration /*defaultValue*/) const override { return TDuration::Zero(); @@ -29,10 +30,17 @@ protected: virtual TDuration GetLagForCompactionBeforeTierings(const TDuration /*def*/) const override { return TDuration::Zero(); } + virtual TDuration GetCompactionActualizationLag(const TDuration /*def*/) const override { + return TDuration::Zero(); + } virtual TDuration GetTTLDefaultWaitingDuration(const TDuration /*defaultValue*/) const override { return TDuration::Seconds(1); } public: + TWaitCompactionController() { + SetPeriodicWakeupActivationPeriod(TDuration::Seconds(1)); + } + ui32 GetFinishedExportsCount() const { return ExportsFinishedCount.Val(); } diff --git a/ydb/core/tx/columnshard/test_helper/ya.make b/ydb/core/tx/columnshard/test_helper/ya.make index 2083768b46e..b75f43de54f 100644 --- a/ydb/core/tx/columnshard/test_helper/ya.make +++ b/ydb/core/tx/columnshard/test_helper/ya.make @@ -13,6 +13,7 @@ PEERDIR( SRCS( helper.cpp controllers.cpp + columnshard_ut_common.cpp ) YQL_LAST_ABI_VERSION() diff --git a/ydb/core/tx/columnshard/transactions/operators/backup.cpp b/ydb/core/tx/columnshard/transactions/operators/backup.cpp index f55d2b82b0b..232aac166da 100644 --- a/ydb/core/tx/columnshard/transactions/operators/backup.cpp +++ b/ydb/core/tx/columnshard/transactions/operators/backup.cpp @@ -4,7 +4,7 @@ namespace NKikimr::NColumnShard { -bool TBackupTransactionOperator::Parse(const TString& data) { +bool TBackupTransactionOperator::Parse(TColumnShard& /*owner*/, const TString& data) { NKikimrTxColumnShard::TBackupTxBody txBody; if (!txBody.ParseFromString(data)) { return false; @@ -32,7 +32,7 @@ bool TBackupTransactionOperator::Parse(const TString& data) { return true; } -TBackupTransactionOperator::TProposeResult TBackupTransactionOperator::Propose(TColumnShard& owner, NTabletFlatExecutor::TTransactionContext& /*txc*/, bool /*proposed*/) const { +TBackupTransactionOperator::TProposeResult TBackupTransactionOperator::ExecuteOnPropose(TColumnShard& owner, NTabletFlatExecutor::TTransactionContext& /*txc*/) const { auto proposition = owner.GetExportsManager()->ProposeTask(ExportTask); if (!proposition) { return TProposeResult(NKikimrTxColumnShard::EResultStatus::ERROR, @@ -41,14 +41,14 @@ TBackupTransactionOperator::TProposeResult TBackupTransactionOperator::Propose(T return TProposeResult(); } -bool TBackupTransactionOperator::Progress(TColumnShard& owner, const NOlap::TSnapshot& version, NTabletFlatExecutor::TTransactionContext& txc) { +bool TBackupTransactionOperator::ExecuteOnProgress(TColumnShard& owner, const NOlap::TSnapshot& version, NTabletFlatExecutor::TTransactionContext& txc) { Y_UNUSED(version); AFL_VERIFY(ExportTask); owner.GetExportsManager()->ConfirmSessionOnExecute(ExportTask->GetIdentifier(), txc); return true; } -bool TBackupTransactionOperator::Complete(TColumnShard& owner, const TActorContext& ctx) { +bool TBackupTransactionOperator::CompleteOnProgress(TColumnShard& owner, const TActorContext& ctx) { AFL_VERIFY(ExportTask); owner.GetExportsManager()->ConfirmSessionOnComplete(ExportTask->GetIdentifier()); auto result = std::make_unique<TEvColumnShard::TEvProposeTransactionResult>( @@ -59,7 +59,7 @@ bool TBackupTransactionOperator::Complete(TColumnShard& owner, const TActorConte return true; } -bool TBackupTransactionOperator::Abort(TColumnShard& owner, NTabletFlatExecutor::TTransactionContext& txc) { +bool TBackupTransactionOperator::ExecuteOnAbort(TColumnShard& owner, NTabletFlatExecutor::TTransactionContext& txc) { owner.GetExportsManager()->RemoveSession(ExportTask->GetIdentifier(), txc); return true; } diff --git a/ydb/core/tx/columnshard/transactions/operators/backup.h b/ydb/core/tx/columnshard/transactions/operators/backup.h index bc873e3dbda..1e9612f6309 100644 --- a/ydb/core/tx/columnshard/transactions/operators/backup.h +++ b/ydb/core/tx/columnshard/transactions/operators/backup.h @@ -6,24 +6,30 @@ namespace NKikimr::NColumnShard { - class TBackupTransactionOperator : public TTxController::ITransactionOperatior { + class TBackupTransactionOperator : public TTxController::ITransactionOperator { private: std::shared_ptr<NOlap::NExport::TExportTask> ExportTask; - using TBase = TTxController::ITransactionOperatior; + using TBase = TTxController::ITransactionOperator; using TProposeResult = TTxController::TProposeResult; static inline auto Registrator = TFactory::TRegistrator<TBackupTransactionOperator>(NKikimrTxColumnShard::TX_KIND_BACKUP); public: using TBase::TBase; - virtual bool Parse(const TString& data) override; + virtual bool Parse(TColumnShard& owner, const TString& data) override; - virtual TProposeResult Propose(TColumnShard& owner, NTabletFlatExecutor::TTransactionContext& txc, bool /*proposed*/) const override; + virtual TProposeResult ExecuteOnPropose(TColumnShard& owner, NTabletFlatExecutor::TTransactionContext& txc) const override; + virtual bool CompleteOnPropose(TColumnShard& /*owner*/, const TActorContext& /*ctx*/) const override { + return true; + } - virtual bool Progress(TColumnShard& owner, const NOlap::TSnapshot& version, NTabletFlatExecutor::TTransactionContext& txc) override; + virtual bool ExecuteOnProgress(TColumnShard& owner, const NOlap::TSnapshot& version, NTabletFlatExecutor::TTransactionContext& txc) override; - virtual bool Complete(TColumnShard& owner, const TActorContext& ctx) override; + virtual bool CompleteOnProgress(TColumnShard& owner, const TActorContext& ctx) override; - virtual bool Abort(TColumnShard& owner, NTabletFlatExecutor::TTransactionContext& txc) override; + virtual bool ExecuteOnAbort(TColumnShard& owner, NTabletFlatExecutor::TTransactionContext& txc) override; + virtual bool CompleteOnAbort(TColumnShard& /*owner*/, const TActorContext& /*ctx*/) override { + return true; + } }; } diff --git a/ydb/core/tx/columnshard/transactions/operators/ev_write.h b/ydb/core/tx/columnshard/transactions/operators/ev_write.h index ca83ce8696a..81e2ad35d8a 100644 --- a/ydb/core/tx/columnshard/transactions/operators/ev_write.h +++ b/ydb/core/tx/columnshard/transactions/operators/ev_write.h @@ -4,14 +4,14 @@ namespace NKikimr::NColumnShard { - class TEvWriteTransactionOperator : public TTxController::ITransactionOperatior { - using TBase = TTxController::ITransactionOperatior; + class TEvWriteTransactionOperator : public TTxController::ITransactionOperator { + using TBase = TTxController::ITransactionOperator; using TProposeResult = TTxController::TProposeResult; static inline auto Registrator = TFactory::TRegistrator<TEvWriteTransactionOperator>(NKikimrTxColumnShard::TX_KIND_COMMIT_WRITE); public: using TBase::TBase; - virtual bool Parse(const TString& data) override { + virtual bool Parse(TColumnShard& /*owner*/, const TString& data) override { NKikimrTxColumnShard::TCommitWriteTxBody commitTxBody; if (!commitTxBody.ParseFromString(data)) { return false; @@ -20,24 +20,32 @@ namespace NKikimr::NColumnShard { return !!LockId; } - TProposeResult Propose(TColumnShard& owner, NTabletFlatExecutor::TTransactionContext& txc, bool /*proposed*/) const override { + TProposeResult ExecuteOnPropose(TColumnShard& owner, NTabletFlatExecutor::TTransactionContext& txc) const override { owner.OperationsManager->LinkTransaction(LockId, GetTxId(), txc); return TProposeResult(); } - virtual bool Progress(TColumnShard& owner, const NOlap::TSnapshot& version, NTabletFlatExecutor::TTransactionContext& txc) override { + bool CompleteOnPropose(TColumnShard& /*owner*/, const TActorContext& /*ctx*/) const override { + return true; + } + + virtual bool ExecuteOnProgress(TColumnShard& owner, const NOlap::TSnapshot& version, NTabletFlatExecutor::TTransactionContext& txc) override { return owner.OperationsManager->CommitTransaction(owner, GetTxId(), txc, version); } - virtual bool Complete(TColumnShard& owner, const TActorContext& ctx) override { + virtual bool CompleteOnProgress(TColumnShard& owner, const TActorContext& ctx) override { auto result = NEvents::TDataEvents::TEvWriteResult::BuildCompleted(owner.TabletID(), GetTxId()); ctx.Send(TxInfo.Source, result.release(), 0, TxInfo.Cookie); return true; } - virtual bool Abort(TColumnShard& owner, NTabletFlatExecutor::TTransactionContext& txc) override { + virtual bool ExecuteOnAbort(TColumnShard& owner, NTabletFlatExecutor::TTransactionContext& txc) override { return owner.OperationsManager->AbortTransaction(owner, GetTxId(), txc); } + virtual bool CompleteOnAbort(TColumnShard& /*owner*/, const TActorContext& /*ctx*/) override { + return true; + } + private: ui64 LockId = 0; }; diff --git a/ydb/core/tx/columnshard/transactions/operators/long_tx_write.h b/ydb/core/tx/columnshard/transactions/operators/long_tx_write.h index 7ef86171f31..69622fc5031 100644 --- a/ydb/core/tx/columnshard/transactions/operators/long_tx_write.h +++ b/ydb/core/tx/columnshard/transactions/operators/long_tx_write.h @@ -4,14 +4,14 @@ namespace NKikimr::NColumnShard { - class TLongTxTransactionOperator : public TTxController::ITransactionOperatior { - using TBase = TTxController::ITransactionOperatior; + class TLongTxTransactionOperator : public TTxController::ITransactionOperator { + using TBase = TTxController::ITransactionOperator; using TProposeResult = TTxController::TProposeResult; static inline auto Registrator = TFactory::TRegistrator<TLongTxTransactionOperator>(NKikimrTxColumnShard::TX_KIND_COMMIT); public: using TBase::TBase; - bool Parse(const TString& data) override { + bool Parse(TColumnShard& /*owner*/, const TString& data) override { NKikimrTxColumnShard::TCommitTxBody commitTxBody; if (!commitTxBody.ParseFromString(data)) { return false; @@ -31,7 +31,7 @@ namespace NKikimr::NColumnShard { } } - TProposeResult Propose(TColumnShard& owner, NTabletFlatExecutor::TTransactionContext& /*txc*/, bool /*proposed*/) const override { + TProposeResult ExecuteOnPropose(TColumnShard& owner, NTabletFlatExecutor::TTransactionContext& /*txc*/) const override { if (WriteIds.empty()) { return TProposeResult(NKikimrTxColumnShard::EResultStatus::ERROR, TStringBuilder() << "Commit TxId# " << GetTxId() << " has an empty list of write ids"); @@ -55,7 +55,11 @@ namespace NKikimr::NColumnShard { return TProposeResult();; } - bool Progress(TColumnShard& owner, const NOlap::TSnapshot& version, NTabletFlatExecutor::TTransactionContext& txc) override { + bool CompleteOnPropose(TColumnShard& /*owner*/, const TActorContext& /*ctx*/) const override { + return true; + } + + bool ExecuteOnProgress(TColumnShard& owner, const NOlap::TSnapshot& version, NTabletFlatExecutor::TTransactionContext& txc) override { TBlobGroupSelector dsGroupSelector(owner.Info()); NOlap::TDbWrapper dbTable(txc.DB, &dsGroupSelector); @@ -78,7 +82,7 @@ namespace NKikimr::NColumnShard { return true; } - bool Complete(TColumnShard& owner, const TActorContext& ctx) override { + bool CompleteOnProgress(TColumnShard& owner, const TActorContext& ctx) override { auto result = std::make_unique<TEvColumnShard::TEvProposeTransactionResult>( owner.TabletID(), TxInfo.TxKind, GetTxId(), NKikimrTxColumnShard::SUCCESS); result->Record.SetStep(TxInfo.PlanStep); @@ -86,7 +90,7 @@ namespace NKikimr::NColumnShard { return true; } - bool Abort(TColumnShard& owner, NTabletFlatExecutor::TTransactionContext& txc) override { + virtual bool ExecuteOnAbort(TColumnShard& owner, NTabletFlatExecutor::TTransactionContext& txc) override { NIceDb::TNiceDb db(txc.DB); for (TWriteId writeId : WriteIds) { owner.RemoveLongTxWrite(db, writeId, GetTxId()); @@ -96,6 +100,9 @@ namespace NKikimr::NColumnShard { owner.InsertTable->Abort(dbTable, WriteIds); return true; } + virtual bool CompleteOnAbort(TColumnShard& /*owner*/, const TActorContext& /*ctx*/) override { + return true; + } private: THashSet<TWriteId> WriteIds; diff --git a/ydb/core/tx/columnshard/transactions/operators/schema.h b/ydb/core/tx/columnshard/transactions/operators/schema.h index deb2582c7b3..d5907e4574a 100644 --- a/ydb/core/tx/columnshard/transactions/operators/schema.h +++ b/ydb/core/tx/columnshard/transactions/operators/schema.h @@ -4,14 +4,14 @@ namespace NKikimr::NColumnShard { - class TSchemaTransactionOperator : public TTxController::ITransactionOperatior { - using TBase = TTxController::ITransactionOperatior; + class TSchemaTransactionOperator : public TTxController::ITransactionOperator { + using TBase = TTxController::ITransactionOperator; using TProposeResult = TTxController::TProposeResult; static inline auto Registrator = TFactory::TRegistrator<TSchemaTransactionOperator>(NKikimrTxColumnShard::TX_KIND_SCHEMA); public: using TBase::TBase; - virtual bool Parse(const TString& data) override { + virtual bool Parse(TColumnShard& /*owner*/, const TString& data) override { if (!SchemaTxBody.ParseFromString(data)) { return false; } @@ -22,7 +22,7 @@ namespace NKikimr::NColumnShard { return false; } - TProposeResult Propose(TColumnShard& owner, NTabletFlatExecutor::TTransactionContext& txc, bool /*proposed*/) const override { + TProposeResult ExecuteOnPropose(TColumnShard& owner, NTabletFlatExecutor::TTransactionContext& txc) const override { switch (SchemaTxBody.TxBody_case()) { case NKikimrTxColumnShard::TSchemaTxBody::kInitShard: { @@ -66,13 +66,17 @@ namespace NKikimr::NColumnShard { return TProposeResult(); } - virtual bool Progress(TColumnShard& owner, const NOlap::TSnapshot& version, NTabletFlatExecutor::TTransactionContext& txc) override { + virtual bool CompleteOnPropose(TColumnShard& /*owner*/, const TActorContext& /*ctx*/) const override { + return true; + } + + virtual bool ExecuteOnProgress(TColumnShard& owner, const NOlap::TSnapshot& version, NTabletFlatExecutor::TTransactionContext& txc) override { owner.RunSchemaTx(SchemaTxBody, version, txc); owner.ProtectSchemaSeqNo(SchemaTxBody.GetSeqNo(), txc); return true; } - virtual bool Complete(TColumnShard& owner, const TActorContext& ctx) override { + virtual bool CompleteOnProgress(TColumnShard& owner, const TActorContext& ctx) override { for (TActorId subscriber : NotifySubscribers) { auto event = MakeHolder<TEvColumnShard::TEvNotifyTxCompletionResult>(owner.TabletID(), GetTxId()); ctx.Send(subscriber, event.Release(), 0, 0); @@ -85,8 +89,10 @@ namespace NKikimr::NColumnShard { return true; } - virtual bool Abort(TColumnShard& owner, NTabletFlatExecutor::TTransactionContext& txc) override { - Y_UNUSED(owner, txc); + virtual bool ExecuteOnAbort(TColumnShard& /*owner*/, NTabletFlatExecutor::TTransactionContext& /*txc*/) override { + return true; + } + virtual bool CompleteOnAbort(TColumnShard& /*owner*/, const TActorContext& /*ctx*/) override { return true; } diff --git a/ydb/core/tx/columnshard/transactions/propose_transaction_base.cpp b/ydb/core/tx/columnshard/transactions/propose_transaction_base.cpp index ef50e2ae595..7f6c1911fda 100644 --- a/ydb/core/tx/columnshard/transactions/propose_transaction_base.cpp +++ b/ydb/core/tx/columnshard/transactions/propose_transaction_base.cpp @@ -6,8 +6,8 @@ namespace NKikimr::NColumnShard { void TProposeTransactionBase::ProposeTransaction(const TTxController::TBasicTxInfo& txInfo, const TString& txBody, const TActorId source, const ui64 cookie, TTransactionContext& txc) { - auto txOperator = TTxController::ITransactionOperatior::TFactory::MakeHolder(txInfo.TxKind, TTxController::TTxInfo(txInfo.TxKind, txInfo.TxId)); - if (!txOperator || !txOperator->Parse(txBody)) { + auto txOperator = TTxController::ITransactionOperator::TFactory::MakeHolder(txInfo.TxKind, TTxController::TTxInfo(txInfo.TxKind, txInfo.TxId)); + if (!txOperator || !txOperator->Parse(*Self, txBody)) { TTxController::TProposeResult proposeResult(NKikimrTxColumnShard::EResultStatus::ERROR, TStringBuilder() << "Error processing commit TxId# " << txInfo.TxId << (txOperator ? ". Parsing error " : ". Unknown operator for txKind")); OnProposeError(proposeResult, txInfo); @@ -23,7 +23,7 @@ namespace NKikimr::NColumnShard { TTxController::TProposeResult proposeResult; OnProposeResult(proposeResult, *txInfoPtr); } else { - auto proposeResult = txOperator->Propose(*Self, txc, false); + auto proposeResult = txOperator->ExecuteOnPropose(*Self, txc); if (!!proposeResult) { const auto fullTxInfo = txOperator->TxWithDeadline() ? Self->GetProgressTxController().RegisterTxWithDeadline(txInfo.TxId, txInfo.TxKind, txBody, source, cookie, txc) : Self->GetProgressTxController().RegisterTx(txInfo.TxId, txInfo.TxKind, txBody, source, cookie, txc); @@ -34,4 +34,14 @@ namespace NKikimr::NColumnShard { } } } + + void TProposeTransactionBase::CompleteTransaction(const ui64 txId, const TActorContext& ctx) { + auto txOperator = Self->GetProgressTxController().GetTxOperator(txId); + if (!txOperator) { + AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("error", "cannot found txOperator in propose transaction base")("tx_id", txId); + } else { + txOperator->CompleteOnPropose(*Self, ctx); + } + } + } diff --git a/ydb/core/tx/columnshard/transactions/propose_transaction_base.h b/ydb/core/tx/columnshard/transactions/propose_transaction_base.h index 7657c4aff47..7502fb677ec 100644 --- a/ydb/core/tx/columnshard/transactions/propose_transaction_base.h +++ b/ydb/core/tx/columnshard/transactions/propose_transaction_base.h @@ -13,6 +13,7 @@ public: protected: void ProposeTransaction(const TTxController::TBasicTxInfo& txInfo, const TString& txBody, const TActorId source, const ui64 cookie, TTransactionContext& txc); + void CompleteTransaction(const ui64 txId, const TActorContext& ctx); virtual void OnProposeResult(TTxController::TProposeResult& proposeResult, const TTxController::TTxInfo& txInfo) = 0; virtual void OnProposeError(TTxController::TProposeResult& proposeResult, const TTxController::TBasicTxInfo& txInfo) = 0; diff --git a/ydb/core/tx/columnshard/transactions/tx_controller.cpp b/ydb/core/tx/columnshard/transactions/tx_controller.cpp index 43a56ecf348..087e79394e3 100644 --- a/ydb/core/tx/columnshard/transactions/tx_controller.cpp +++ b/ydb/core/tx/columnshard/transactions/tx_controller.cpp @@ -65,9 +65,9 @@ bool TTxController::Load(NTabletFlatExecutor::TTransactionContext& txc) { } const TString txBody = rowset.GetValue<Schema::TxInfo::TxBody>(); - ITransactionOperatior::TPtr txOperator(ITransactionOperatior::TFactory::Construct(txInfo.TxKind, txInfo)); + ITransactionOperator::TPtr txOperator(ITransactionOperator::TFactory::Construct(txInfo.TxKind, txInfo)); Y_ABORT_UNLESS(!!txOperator); - Y_ABORT_UNLESS(txOperator->Parse(txBody)); + Y_ABORT_UNLESS(txOperator->Parse(Owner, txBody)); Operators[txId] = txOperator; if (!rowset.Next()) { @@ -77,7 +77,7 @@ bool TTxController::Load(NTabletFlatExecutor::TTransactionContext& txc) { return true; } -TTxController::ITransactionOperatior::TPtr TTxController::GetTxOperator(const ui64 txId) { +TTxController::ITransactionOperator::TPtr TTxController::GetTxOperator(const ui64 txId) { auto it = Operators.find(txId); if(it == Operators.end()) { return nullptr; @@ -85,7 +85,7 @@ TTxController::ITransactionOperatior::TPtr TTxController::GetTxOperator(const ui return it->second; } -TTxController::ITransactionOperatior::TPtr TTxController::GetVerifiedTxOperator(const ui64 txId) { +TTxController::ITransactionOperator::TPtr TTxController::GetVerifiedTxOperator(const ui64 txId) { auto it = Operators.find(txId); AFL_VERIFY(it != Operators.end())("tx_id", txId); return it->second; @@ -99,9 +99,9 @@ TTxController::TTxInfo TTxController::RegisterTx(const ui64 txId, const NKikimrT txInfo.Source = source; txInfo.Cookie = cookie; - ITransactionOperatior::TPtr txOperator(ITransactionOperatior::TFactory::Construct(txInfo.TxKind, txInfo)); + ITransactionOperator::TPtr txOperator(ITransactionOperator::TFactory::Construct(txInfo.TxKind, txInfo)); Y_ABORT_UNLESS(!!txOperator); - Y_ABORT_UNLESS(txOperator->Parse(txBody)); + Y_ABORT_UNLESS(txOperator->Parse(Owner, txBody)); Operators[txId] = txOperator; Schema::SaveTxInfo(db, txInfo.TxId, txInfo.TxKind, txBody, Max<ui64>(), txInfo.Source, txInfo.Cookie); @@ -118,9 +118,9 @@ TTxController::TTxInfo TTxController::RegisterTxWithDeadline(const ui64 txId, co txInfo.MinStep = GetAllowedStep(); txInfo.MaxStep = txInfo.MinStep + MaxCommitTxDelay.MilliSeconds(); - ITransactionOperatior::TPtr txOperator(ITransactionOperatior::TFactory::Construct(txInfo.TxKind, txInfo)); + ITransactionOperator::TPtr txOperator(ITransactionOperator::TFactory::Construct(txInfo.TxKind, txInfo)); Y_ABORT_UNLESS(!!txOperator); - Y_ABORT_UNLESS(txOperator->Parse(txBody)); + Y_ABORT_UNLESS(txOperator->Parse(Owner, txBody)); Operators[txId] = txOperator; Schema::SaveTxInfo(db, txInfo.TxId, txInfo.TxKind, txBody, txInfo.MaxStep, txInfo.Source, txInfo.Cookie); @@ -137,7 +137,8 @@ bool TTxController::AbortTx(const ui64 txId, NTabletFlatExecutor::TTransactionCo auto opIt = Operators.find(txId); Y_ABORT_UNLESS(opIt != Operators.end()); - opIt->second->Abort(Owner, txc); + opIt->second->ExecuteOnAbort(Owner, txc); + opIt->second->CompleteOnAbort(Owner, NActors::TActivationContext::AsActorContext()); if (it->second.MaxStep != Max<ui64>()) { DeadlineQueue.erase(TPlanQueueItem(it->second.MaxStep, txId)); @@ -149,25 +150,41 @@ bool TTxController::AbortTx(const ui64 txId, NTabletFlatExecutor::TTransactionCo return true; } -bool TTxController::CancelTx(const ui64 txId, NTabletFlatExecutor::TTransactionContext& txc) { +bool TTxController::CompleteOnCancel(const ui64 txId, const TActorContext& ctx) { auto it = BasicTxInfo.find(txId); if (it == BasicTxInfo.end()) { return true; } if (it->second.PlanStep != 0) { - // Cannot cancel planned transaction return false; } auto opIt = Operators.find(txId); Y_ABORT_UNLESS(opIt != Operators.end()); - opIt->second->Abort(Owner, txc); + opIt->second->CompleteOnAbort(Owner, ctx); if (it->second.MaxStep != Max<ui64>()) { DeadlineQueue.erase(TPlanQueueItem(it->second.MaxStep, txId)); } BasicTxInfo.erase(it); Operators.erase(txId); + return true; +} + +bool TTxController::ExecuteOnCancel(const ui64 txId, NTabletFlatExecutor::TTransactionContext& txc) { + auto it = BasicTxInfo.find(txId); + if (it == BasicTxInfo.end()) { + return true; + } + if (it->second.PlanStep != 0) { + // Cannot cancel planned transaction + return false; + } + + auto opIt = Operators.find(txId); + Y_ABORT_UNLESS(opIt != Operators.end()); + opIt->second->ExecuteOnAbort(Owner, txc); + NIceDb::TNiceDb db(txc.DB); Schema::EraseTxInfo(db, txId); return true; @@ -188,13 +205,12 @@ std::optional<TTxController::TTxInfo> TTxController::StartPlannedTx() { void TTxController::FinishPlannedTx(const ui64 txId, NTabletFlatExecutor::TTransactionContext& txc) { NIceDb::TNiceDb db(txc.DB); - - BasicTxInfo.erase(txId); - Operators.erase(txId); Schema::EraseTxInfo(db, txId); } void TTxController::CompleteRunningTx(const TPlanQueueItem& txItem) { + AFL_VERIFY(BasicTxInfo.erase(txItem.TxId)); + AFL_VERIFY(Operators.erase(txItem.TxId)); AFL_VERIFY(RunningQueue.erase(txItem))("info", txItem.DebugString()); } diff --git a/ydb/core/tx/columnshard/transactions/tx_controller.h b/ydb/core/tx/columnshard/transactions/tx_controller.h index d48bd504a1f..5a54471c87c 100644 --- a/ydb/core/tx/columnshard/transactions/tx_controller.h +++ b/ydb/core/tx/columnshard/transactions/tx_controller.h @@ -71,14 +71,14 @@ public: } }; - class ITransactionOperatior { + class ITransactionOperator { protected: TTxInfo TxInfo; public: - using TPtr = std::shared_ptr<ITransactionOperatior>; - using TFactory = NObjectFactory::TParametrizedObjectFactory<ITransactionOperatior, NKikimrTxColumnShard::ETransactionKind, TTxInfo>; + using TPtr = std::shared_ptr<ITransactionOperator>; + using TFactory = NObjectFactory::TParametrizedObjectFactory<ITransactionOperator, NKikimrTxColumnShard::ETransactionKind, TTxInfo>; - ITransactionOperatior(const TTxInfo& txInfo) + ITransactionOperator(const TTxInfo& txInfo) : TxInfo(txInfo) {} @@ -86,18 +86,22 @@ public: return TxInfo.TxId; } - virtual ~ITransactionOperatior() {} + virtual ~ITransactionOperator() {} virtual bool TxWithDeadline() const { return true; } - virtual bool Parse(const TString& data) = 0; - virtual TProposeResult Propose(TColumnShard& owner, NTabletFlatExecutor::TTransactionContext& txc, bool proposed) const = 0; + virtual bool Parse(TColumnShard& owner, const TString& data) = 0; + virtual TProposeResult ExecuteOnPropose(TColumnShard& owner, NTabletFlatExecutor::TTransactionContext& txc) const = 0; + virtual bool CompleteOnPropose(TColumnShard& owner, const TActorContext& ctx) const = 0; + + virtual bool ExecuteOnProgress(TColumnShard& owner, const NOlap::TSnapshot& version, NTabletFlatExecutor::TTransactionContext& txc) = 0; + virtual bool CompleteOnProgress(TColumnShard& owner, const TActorContext& ctx) = 0; + + virtual bool ExecuteOnAbort(TColumnShard& owner, NTabletFlatExecutor::TTransactionContext& txc) = 0; + virtual bool CompleteOnAbort(TColumnShard& owner, const TActorContext& ctx) = 0; - virtual bool Progress(TColumnShard& owner, const NOlap::TSnapshot& version, NTabletFlatExecutor::TTransactionContext& txc) = 0; - virtual bool Abort(TColumnShard& owner, NTabletFlatExecutor::TTransactionContext& txc) = 0; - virtual bool Complete(TColumnShard& owner, const TActorContext& ctx) = 0; virtual void RegisterSubscriber(const TActorId&) { AFL_VERIFY(false)("message", "Not implemented"); }; @@ -112,7 +116,7 @@ private: std::set<TPlanQueueItem> PlanQueue; std::set<TPlanQueueItem> RunningQueue; - THashMap<ui64, ITransactionOperatior::TPtr> Operators; + THashMap<ui64, ITransactionOperator::TPtr> Operators; private: ui64 GetAllowedStep() const; @@ -121,8 +125,8 @@ private: public: TTxController(TColumnShard& owner); - ITransactionOperatior::TPtr GetTxOperator(const ui64 txId); - ITransactionOperatior::TPtr GetVerifiedTxOperator(const ui64 txId); + ITransactionOperator::TPtr GetTxOperator(const ui64 txId); + ITransactionOperator::TPtr GetVerifiedTxOperator(const ui64 txId); ui64 GetMemoryUsage() const; bool HaveOutdatedTxs() const; @@ -132,7 +136,8 @@ public: TTxInfo RegisterTx(const ui64 txId, const NKikimrTxColumnShard::ETransactionKind& txKind, const TString& txBody, const TActorId& source, const ui64 cookie, NTabletFlatExecutor::TTransactionContext& txc); TTxInfo RegisterTxWithDeadline(const ui64 txId, const NKikimrTxColumnShard::ETransactionKind& txKind, const TString& txBody, const TActorId& source, const ui64 cookie, NTabletFlatExecutor::TTransactionContext& txc); - bool CancelTx(const ui64 txId, NTabletFlatExecutor::TTransactionContext& txc); + bool ExecuteOnCancel(const ui64 txId, NTabletFlatExecutor::TTransactionContext& txc); + bool CompleteOnCancel(const ui64 txId, const TActorContext& ctx); std::optional<TTxInfo> StartPlannedTx(); void FinishPlannedTx(const ui64 txId, NTabletFlatExecutor::TTransactionContext& txc); diff --git a/ydb/core/tx/columnshard/ut_rw/ut_backup.cpp b/ydb/core/tx/columnshard/ut_rw/ut_backup.cpp index 377b9de19ce..a2f5b502d44 100644 --- a/ydb/core/tx/columnshard/ut_rw/ut_backup.cpp +++ b/ydb/core/tx/columnshard/ut_rw/ut_backup.cpp @@ -1,10 +1,12 @@ -#include "columnshard_ut_common.h" +#include <ydb/core/tx/columnshard/test_helper/columnshard_ut_common.h> #include <ydb/core/tx/columnshard/hooks/abstract/abstract.h> #include <ydb/core/tx/columnshard/hooks/testing/controller.h> #include <ydb/core/tx/columnshard/test_helper/controllers.h> +#include <ydb/core/tx/columnshard/columnshard.h> #include <ydb/core/tx/columnshard/operations/write_data.h> +#include <ydb/core/tx/tx_processing.h> #include <ydb/core/wrappers/fake_storage.h> diff --git a/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp b/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp index 65277b9f151..87516e83008 100644 --- a/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp +++ b/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp @@ -1,10 +1,11 @@ -#include "columnshard_ut_common.h" +#include <ydb/core/tx/columnshard/test_helper/columnshard_ut_common.h> #include <ydb/core/kqp/compute_actor/kqp_compute_events.h> #include <ydb/core/base/blobstorage.h> #include <util/string/printf.h> #include <arrow/api.h> #include <arrow/ipc/reader.h> #include <ydb/library/yverify_stream/yverify_stream.h> +#include <ydb/core/tx/columnshard/columnshard_impl.h> #include <ydb/core/tx/columnshard/engines/changes/with_appended.h> #include <ydb/core/tx/columnshard/engines/changes/compaction.h> #include <ydb/core/tx/columnshard/engines/changes/cleanup_portions.h> @@ -12,6 +13,7 @@ #include <ydb/core/tx/columnshard/hooks/abstract/abstract.h> #include <ydb/core/tx/columnshard/hooks/testing/controller.h> #include <ydb/core/tx/columnshard/engines/portions/portion_info.h> +#include <ydb/core/tx/columnshard/test_helper/controllers.h> #include <ydb/core/tx/columnshard/common/tests/shard_reader.h> #include <ydb/library/actors/protos/unittests.pb.h> #include <ydb/core/formats/arrow/simple_builder/filler.h> @@ -2155,7 +2157,9 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) { void TestCompactionSplitGranuleImpl(const TestTableDescription& table, const TTestBlobOptions& testBlobOptions = {}) { TTestBasicRuntime runtime; TTester::Setup(runtime); - auto csDefaultControllerGuard = NKikimr::NYDBTest::TControllers::RegisterCSControllerGuard<TDefaultTestsController>(); + runtime.SetLogPriority(NKikimrServices::TX_COLUMNSHARD_SCAN, NActors::NLog::PRI_NOTICE); + auto csDefaultControllerGuard = NKikimr::NYDBTest::TControllers::RegisterCSControllerGuard<NOlap::TWaitCompactionController>(); + csDefaultControllerGuard->SetSmallSizeDetector(1LLU << 20); TActorId sender = runtime.AllocateEdgeActor(); CreateTestBootstrapper(runtime, CreateTestTabletInfo(TTestTxConfig::TxTablet0, TTabletTypes::ColumnShard), &CreateColumnShard); @@ -2273,7 +2277,9 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) { RebootTablet(runtime, TTestTxConfig::TxTablet0, sender); } - { // Get index stats + const TInstant start = TInstant::Now(); + bool success = false; + while (!success && TInstant::Now() - start < TDuration::Seconds(30)) { // Get index stats ScanIndexStats(runtime, sender, {tableId, 42}, NOlap::TSnapshot(planStep, txId), 0); auto scanInited = runtime.GrabEdgeEvent<NKqp::TEvKqpCompute::TEvScanInitActor>(handle); auto& msg = scanInited->Record; @@ -2283,7 +2289,6 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) { ui64 sumCompactedRows = 0; ui64 sumInsertedBytes = 0; ui64 sumInsertedRows = 0; - std::optional<ui32> keyColumnId; while (true) { ui32 resultLimit = 1024 * 1024; runtime.Send(new IEventHandle(scanActorId, sender, new NKqp::TEvKqpCompute::TEvScanDataAck(resultLimit, 0, 1))); @@ -2294,15 +2299,12 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) { } UNIT_ASSERT(scan->ArrowBatch); auto batchStats = NArrow::ToBatch(scan->ArrowBatch, true); - // Cerr << batchStats->ToString() << Endl; - for (ui32 i = 0; i < batchStats->num_rows(); ++i) { auto paths = batchStats->GetColumnByName("PathId"); auto kinds = batchStats->GetColumnByName("Kind"); auto rows = batchStats->GetColumnByName("Rows"); - auto bytes = batchStats->GetColumnByName("BlobRangeSize"); - auto rawBytes = batchStats->GetColumnByName("RawBytes"); - auto internalColumnIds = batchStats->GetColumnByName("InternalEntityId"); + auto bytes = batchStats->GetColumnByName("ColumnBlobBytes"); + auto rawBytes = batchStats->GetColumnByName("ColumnRawBytes"); auto activities = batchStats->GetColumnByName("Activity"); AFL_VERIFY(activities); @@ -2312,30 +2314,21 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) { ui64 numRows = static_cast<arrow::UInt64Array&>(*rows).Value(i); ui64 numBytes = static_cast<arrow::UInt64Array&>(*bytes).Value(i); ui64 numRawBytes = static_cast<arrow::UInt64Array&>(*rawBytes).Value(i); - ui32 internalColumnId = static_cast<arrow::UInt32Array&>(*internalColumnIds).Value(i); bool activity = static_cast<arrow::BooleanArray&>(*activities).Value(i); if (!activity) { continue; } - if (!keyColumnId) { - keyColumnId = internalColumnId; - } Cerr << "[" << __LINE__ << "] " << activity << " " << table.Pk[0].GetType().GetTypeId() << " " << pathId << " " << kindStr << " " << numRows << " " << numBytes << " " << numRawBytes << "\n"; if (pathId == tableId) { - if (kindStr == ::ToString(NOlap::NPortion::EProduced::COMPACTED) || kindStr == ::ToString(NOlap::NPortion::EProduced::SPLIT_COMPACTED)) { + if (kindStr == ::ToString(NOlap::NPortion::EProduced::COMPACTED) || kindStr == ::ToString(NOlap::NPortion::EProduced::SPLIT_COMPACTED) || numBytes > (4LLU << 20)) { sumCompactedBytes += numBytes; - if (*keyColumnId == internalColumnId) { - sumCompactedRows += numRows; - } + sumCompactedRows += numRows; //UNIT_ASSERT(numRawBytes > numBytes); - } - if (kindStr == ::ToString(NOlap::NPortion::EProduced::INSERTED)) { + } else if (kindStr == ::ToString(NOlap::NPortion::EProduced::INSERTED)) { sumInsertedBytes += numBytes; - if (*keyColumnId == internalColumnId) { - sumInsertedRows += numRows; - } + sumInsertedRows += numRows; //UNIT_ASSERT(numRawBytes > numBytes); } } else { @@ -2346,12 +2339,16 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) { } } Cerr << "compacted=" << sumCompactedRows << ";inserted=" << sumInsertedRows << ";expected=" << fullNumRows << ";" << Endl; - RebootTablet(runtime, TTestTxConfig::TxTablet0, sender); - AFL_VERIFY(sumCompactedRows == fullNumRows)("sum", sumCompactedRows)("full", fullNumRows); - UNIT_ASSERT(sumCompactedRows < sumCompactedBytes); - UNIT_ASSERT(sumInsertedRows == 0); - UNIT_ASSERT(sumInsertedBytes == 0); + if (!sumInsertedRows && sumCompactedRows == fullNumRows) { + success = true; + RebootTablet(runtime, TTestTxConfig::TxTablet0, sender); + UNIT_ASSERT(sumCompactedRows < sumCompactedBytes); + UNIT_ASSERT(sumInsertedBytes == 0); + } else { + Wakeup(runtime, sender, TTestTxConfig::TxTablet0); + } } + AFL_VERIFY(success); } void TestCompactionSplitGranule(const TTypeId typeId) { diff --git a/ydb/core/tx/columnshard/ut_rw/ut_normalizer.cpp b/ydb/core/tx/columnshard/ut_rw/ut_normalizer.cpp index 29c748866ea..bda23a87d81 100644 --- a/ydb/core/tx/columnshard/ut_rw/ut_normalizer.cpp +++ b/ydb/core/tx/columnshard/ut_rw/ut_normalizer.cpp @@ -1,4 +1,5 @@ -#include "columnshard_ut_common.h" +#include <ydb/core/tx/columnshard/columnshard_schema.h> +#include <ydb/core/tx/columnshard/test_helper/columnshard_ut_common.h> #include <ydb/core/tx/columnshard/hooks/abstract/abstract.h> #include <ydb/core/tx/columnshard/hooks/testing/controller.h> @@ -233,6 +234,11 @@ Y_UNIT_TEST_SUITE(Normalizers) { { auto readResult = ReadAllAsBatch(runtime, tableId, NOlap::TSnapshot(11, txId), schema); UNIT_ASSERT_VALUES_EQUAL(readResult->num_rows(), 20048); + while (!csControllerGuard->GetInsertFinishedCounter().Val()) { + Cerr << csControllerGuard->GetInsertStartedCounter().Val() << Endl; + Wakeup(runtime, sender, TTestTxConfig::TxTablet0); + runtime.SimulateSleep(TDuration::Seconds(1)); + } } RebootTablet(runtime, TTestTxConfig::TxTablet0, sender); diff --git a/ydb/core/tx/columnshard/ut_rw/ya.make b/ydb/core/tx/columnshard/ut_rw/ya.make index 8f5af7869b5..d03099069b7 100644 --- a/ydb/core/tx/columnshard/ut_rw/ya.make +++ b/ydb/core/tx/columnshard/ut_rw/ya.make @@ -32,7 +32,6 @@ PEERDIR( YQL_LAST_ABI_VERSION() SRCS( - columnshard_ut_common.cpp ut_columnshard_read_write.cpp ut_normalizer.cpp ut_backup.cpp diff --git a/ydb/core/tx/columnshard/ut_schema/ut_columnshard_schema.cpp b/ydb/core/tx/columnshard/ut_schema/ut_columnshard_schema.cpp index 7574f94793c..caa17144253 100644 --- a/ydb/core/tx/columnshard/ut_schema/ut_columnshard_schema.cpp +++ b/ydb/core/tx/columnshard/ut_schema/ut_columnshard_schema.cpp @@ -1,4 +1,4 @@ -#include "columnshard_ut_common.h" +#include <ydb/core/tx/columnshard/test_helper/columnshard_ut_common.h> #include <ydb/core/base/tablet.h> #include <ydb/core/wrappers/s3_wrapper.h> #include <ydb/services/metadata/service.h> diff --git a/ydb/core/tx/columnshard/ut_schema/ya.make b/ydb/core/tx/columnshard/ut_schema/ya.make index d3b7fdd5d84..35d906ee205 100644 --- a/ydb/core/tx/columnshard/ut_schema/ya.make +++ b/ydb/core/tx/columnshard/ut_schema/ya.make @@ -30,7 +30,6 @@ PEERDIR( YQL_LAST_ABI_VERSION() SRCS( - columnshard_ut_common.cpp ut_columnshard_schema.cpp ) diff --git a/ydb/core/tx/schemeshard/ut_olap/ut_olap.cpp b/ydb/core/tx/schemeshard/ut_olap/ut_olap.cpp index 31022e75558..c53fdeb5842 100644 --- a/ydb/core/tx/schemeshard/ut_olap/ut_olap.cpp +++ b/ydb/core/tx/schemeshard/ut_olap/ut_olap.cpp @@ -1,6 +1,6 @@ #include <ydb/core/tx/schemeshard/ut_helpers/helpers.h> #include <ydb/core/tx/columnshard/columnshard.h> -#include <ydb/core/tx/columnshard/columnshard_ut_common.h> +#include <ydb/core/tx/columnshard/test_helper/columnshard_ut_common.h> #include <ydb/core/formats/arrow/arrow_helpers.h> #include <ydb/core/formats/arrow/arrow_batch_builder.h> diff --git a/ydb/core/tx/schemeshard/ut_olap/ya.make b/ydb/core/tx/schemeshard/ut_olap/ya.make index c502f450c32..1393af86188 100644 --- a/ydb/core/tx/schemeshard/ut_olap/ya.make +++ b/ydb/core/tx/schemeshard/ut_olap/ya.make @@ -30,7 +30,6 @@ YQL_LAST_ABI_VERSION() SRCS( ut_olap.cpp - ydb/core/tx/columnshard/columnshard_ut_common.cpp ) END() |