aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov333 <ivanmorozov@ydb.tech>2024-04-24 15:06:08 +0300
committerGitHub <noreply@github.com>2024-04-24 15:06:08 +0300
commit16a42e93c539c43df1c9d8ae3b64b416b890d1c9 (patch)
tree3bf09948a64c570101c052dba9b70483d8ff3810
parent087c843e70c2dfebdf7660706c7e5eed2b946e97 (diff)
downloadydb-16a42e93c539c43df1c9d8ae3b64b416b890d1c9.tar.gz
rename and additional methods for tx operations (#4003)
-rw-r--r--ydb/core/kqp/ut/common/columnshard.cpp1
-rw-r--r--ydb/core/kqp/ut/common/columnshard.h2
-rw-r--r--ydb/core/kqp/ut/common/ya.make1
-rw-r--r--ydb/core/kqp/ut/olap/sys_view_ut.cpp220
-rw-r--r--ydb/core/kqp/ut/olap/ya.make2
-rw-r--r--ydb/core/kqp/ut/scheme/kqp_constraints_ut.cpp3
-rw-r--r--ydb/core/tx/columnshard/background_controller.h13
-rw-r--r--ydb/core/tx/columnshard/blobs_action/transaction/tx_gc_insert_table.cpp1
-rw-r--r--ydb/core/tx/columnshard/blobs_action/transaction/tx_write.cpp1
-rw-r--r--ydb/core/tx/columnshard/columnshard__progress_tx.cpp19
-rw-r--r--ydb/core/tx/columnshard/columnshard__propose_cancel.cpp7
-rw-r--r--ydb/core/tx/columnshard/columnshard__propose_transaction.cpp4
-rw-r--r--ydb/core/tx/columnshard/columnshard_impl.cpp9
-rw-r--r--ydb/core/tx/columnshard/engines/storage/granule.h3
-rw-r--r--ydb/core/tx/columnshard/engines/storage/optimizer/lbuckets/optimizer.h20
-rw-r--r--ydb/core/tx/columnshard/engines/ut/ut_logs_engine.cpp4
-rw-r--r--ydb/core/tx/columnshard/engines/ut/ut_program.cpp2
-rw-r--r--ydb/core/tx/columnshard/engines/ut/ya.make1
-rw-r--r--ydb/core/tx/columnshard/hooks/abstract/abstract.h4
-rw-r--r--ydb/core/tx/columnshard/hooks/testing/controller.h65
-rw-r--r--ydb/core/tx/columnshard/test_helper/columnshard_ut_common.cpp (renamed from ydb/core/tx/columnshard/columnshard_ut_common.cpp)14
-rw-r--r--ydb/core/tx/columnshard/test_helper/columnshard_ut_common.h (renamed from ydb/core/tx/columnshard/columnshard_ut_common.h)17
-rw-r--r--ydb/core/tx/columnshard/test_helper/controllers.cpp2
-rw-r--r--ydb/core/tx/columnshard/test_helper/controllers.h10
-rw-r--r--ydb/core/tx/columnshard/test_helper/ya.make1
-rw-r--r--ydb/core/tx/columnshard/transactions/operators/backup.cpp10
-rw-r--r--ydb/core/tx/columnshard/transactions/operators/backup.h20
-rw-r--r--ydb/core/tx/columnshard/transactions/operators/ev_write.h22
-rw-r--r--ydb/core/tx/columnshard/transactions/operators/long_tx_write.h21
-rw-r--r--ydb/core/tx/columnshard/transactions/operators/schema.h22
-rw-r--r--ydb/core/tx/columnshard/transactions/propose_transaction_base.cpp16
-rw-r--r--ydb/core/tx/columnshard/transactions/propose_transaction_base.h1
-rw-r--r--ydb/core/tx/columnshard/transactions/tx_controller.cpp46
-rw-r--r--ydb/core/tx/columnshard/transactions/tx_controller.h33
-rw-r--r--ydb/core/tx/columnshard/ut_rw/ut_backup.cpp4
-rw-r--r--ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp51
-rw-r--r--ydb/core/tx/columnshard/ut_rw/ut_normalizer.cpp8
-rw-r--r--ydb/core/tx/columnshard/ut_rw/ya.make1
-rw-r--r--ydb/core/tx/columnshard/ut_schema/ut_columnshard_schema.cpp2
-rw-r--r--ydb/core/tx/columnshard/ut_schema/ya.make1
-rw-r--r--ydb/core/tx/schemeshard/ut_olap/ut_olap.cpp2
-rw-r--r--ydb/core/tx/schemeshard/ut_olap/ya.make1
42 files changed, 440 insertions, 247 deletions
diff --git a/ydb/core/kqp/ut/common/columnshard.cpp b/ydb/core/kqp/ut/common/columnshard.cpp
index bef7d6cb91a..441be48503c 100644
--- a/ydb/core/kqp/ut/common/columnshard.cpp
+++ b/ydb/core/kqp/ut/common/columnshard.cpp
@@ -1,5 +1,6 @@
#include "columnshard.h"
#include <ydb/core/testlib/cs_helper.h>
+#include <ydb/core/base/tablet_pipecache.h>
extern "C" {
#include <ydb/library/yql/parser/pg_wrapper/postgresql/src/include/catalog/pg_type_d.h>
diff --git a/ydb/core/kqp/ut/common/columnshard.h b/ydb/core/kqp/ut/common/columnshard.h
index 34046f1b110..a938e91b4d4 100644
--- a/ydb/core/kqp/ut/common/columnshard.h
+++ b/ydb/core/kqp/ut/common/columnshard.h
@@ -5,7 +5,7 @@
#include <ydb/public/lib/scheme_types/scheme_type_id.h>
#include <ydb/public/sdk/cpp/client/ydb_table/table.h>
#include <ydb/public/sdk/cpp/client/ydb_types/status_codes.h>
-#include <ydb/core/tx/columnshard/columnshard_ut_common.h>
+#include <ydb/core/tx/columnshard/test_helper/columnshard_ut_common.h>
#include <ydb/core/formats/arrow/simple_builder/filler.h>
#include <ydb/core/formats/arrow/simple_builder/array.h>
diff --git a/ydb/core/kqp/ut/common/ya.make b/ydb/core/kqp/ut/common/ya.make
index 97281a7aad6..db2b3dbfc3a 100644
--- a/ydb/core/kqp/ut/common/ya.make
+++ b/ydb/core/kqp/ut/common/ya.make
@@ -18,6 +18,7 @@ PEERDIR(
ydb/library/yql/udfs/common/string
ydb/library/yql/utils/backtrace
ydb/public/lib/yson_value
+ ydb/core/tx/columnshard/test_helper
ydb/public/sdk/cpp/client/draft
ydb/public/sdk/cpp/client/ydb_query
ydb/public/sdk/cpp/client/ydb_proto
diff --git a/ydb/core/kqp/ut/olap/sys_view_ut.cpp b/ydb/core/kqp/ut/olap/sys_view_ut.cpp
index 866563f088f..4242f767629 100644
--- a/ydb/core/kqp/ut/olap/sys_view_ut.cpp
+++ b/ydb/core/kqp/ut/olap/sys_view_ut.cpp
@@ -6,6 +6,7 @@
#include <library/cpp/testing/unittest/registar.h>
#include <ydb/core/tx/columnshard/hooks/testing/controller.h>
+#include <ydb/core/tx/columnshard/test_helper/controllers.h>
namespace NKikimr::NKqp {
@@ -14,57 +15,52 @@ Y_UNIT_TEST_SUITE(KqpOlapSysView) {
auto settings = TKikimrSettings()
.SetWithSampleTables(false);
TKikimrRunner kikimr(settings);
- static ui32 numKinds = 2;
- auto csController = NYDBTest::TControllers::RegisterCSControllerGuard<NYDBTest::NColumnShard::TController>();
+ auto csController = NYDBTest::TControllers::RegisterCSControllerGuard<NOlap::TWaitCompactionController>();
TLocalHelper(kikimr).CreateTestOlapTable();
for (ui64 i = 0; i < 100; ++i) {
WriteTestData(kikimr, "/Root/olapStore/olapTable", 0, 1000000 + i * 10000, 1000);
}
+ csController->WaitCompactions(TDuration::Seconds(10));
auto tableClient = kikimr.GetTableClient();
auto selectQuery = TString(R"(
SELECT PathId, Kind, TabletId, Sum(Rows) as Rows
- FROM `/Root/olapStore/.sys/store_primary_index_stats`
+ FROM `/Root/olapStore/.sys/store_primary_index_portion_stats`
GROUP BY PathId, Kind, TabletId
ORDER BY TabletId, Kind, PathId
)");
auto rows = ExecuteScanQuery(tableClient, selectQuery);
- UNIT_ASSERT_VALUES_EQUAL(rows.size(), numKinds*3);
+ UNIT_ASSERT_VALUES_EQUAL(rows.size(), 3);
UNIT_ASSERT_VALUES_EQUAL(GetUint64(rows[0].at("PathId")), 3ull);
+ UNIT_ASSERT_VALUES_EQUAL(GetUint64(rows[1].at("PathId")), 3ull);
+ UNIT_ASSERT_VALUES_EQUAL(GetUint64(rows[2].at("PathId")), 3ull);
UNIT_ASSERT_VALUES_EQUAL(GetUtf8(rows[0].at("Kind")), "INSERTED");
- UNIT_ASSERT_GE(GetUint64(rows[0].at("TabletId")), 72075186224037888ull);
- UNIT_ASSERT_GE(GetUint64(rows[2].at("TabletId")), 72075186224037889ull);
- UNIT_ASSERT_GE(GetUint64(rows[4].at("TabletId")), 72075186224037890ull);
- UNIT_ASSERT_GE(GetUint64(rows[1].at("TabletId")), GetUint64(rows[0].at("TabletId")));
+ UNIT_ASSERT_VALUES_EQUAL(GetUtf8(rows[1].at("Kind")), "INSERTED");
UNIT_ASSERT_VALUES_EQUAL(GetUtf8(rows[2].at("Kind")), "INSERTED");
- UNIT_ASSERT_GE(GetUint64(rows[2].at("TabletId")), GetUint64(rows[1].at("TabletId")));
- UNIT_ASSERT_VALUES_EQUAL(GetUint64(rows[3].at("PathId")), 3ull);
- UNIT_ASSERT_VALUES_EQUAL(GetUtf8(rows[3].at("Kind")), "SPLIT_COMPACTED");
- UNIT_ASSERT_VALUES_EQUAL(GetUint64(rows[3].at("TabletId")), GetUint64(rows[2].at("TabletId")));
- UNIT_ASSERT_VALUES_EQUAL(GetUtf8(rows[4].at("Kind")), "INSERTED");
- UNIT_ASSERT_VALUES_EQUAL(GetUint64(rows[4].at("TabletId")), GetUint64(rows[5].at("TabletId")));
- UNIT_ASSERT_GE(
- GetUint64(rows[0].at("Rows")) + GetUint64(rows[1].at("Rows")) + GetUint64(rows[2].at("Rows")) +
- GetUint64(rows[3].at("Rows")) + GetUint64(rows[4].at("Rows")) + GetUint64(rows[5].at("Rows")),
- 0.3*0.9*100*1000); // >= 90% of 100K inserted rows
+ UNIT_ASSERT_VALUES_EQUAL(GetUint64(rows[0].at("TabletId")), 72075186224037888ull);
+ UNIT_ASSERT_VALUES_EQUAL(GetUint64(rows[1].at("TabletId")), 72075186224037889ull);
+ UNIT_ASSERT_VALUES_EQUAL(GetUint64(rows[2].at("TabletId")), 72075186224037890ull);
+ UNIT_ASSERT_VALUES_EQUAL(
+ GetUint64(rows[0].at("Rows")) + GetUint64(rows[1].at("Rows")) + GetUint64(rows[2].at("Rows")),
+ 100 * 1000); // >= 90% of 100K inserted rows
}
Y_UNIT_TEST(StatsSysViewTable) {
auto settings = TKikimrSettings()
.SetWithSampleTables(false);
TKikimrRunner kikimr(settings);
- auto csController = NYDBTest::TControllers::RegisterCSControllerGuard<NYDBTest::NColumnShard::TController>();
- static ui32 numKinds = 5;
+ auto csController = NYDBTest::TControllers::RegisterCSControllerGuard<NOlap::TWaitCompactionController>();
TLocalHelper(kikimr).CreateTestOlapTable("olapTable_1");
TLocalHelper(kikimr).CreateTestOlapTable("olapTable_2");
for (ui64 i = 0; i < 10; ++i) {
- WriteTestData(kikimr, "/Root/olapStore/olapTable_1", 0, 1000000 + i*10000, 1000);
- WriteTestData(kikimr, "/Root/olapStore/olapTable_2", 0, 1000000 + i*10000, 2000);
+ WriteTestData(kikimr, "/Root/olapStore/olapTable_1", 0, 1000000 + i * 10000, 1000);
+ WriteTestData(kikimr, "/Root/olapStore/olapTable_2", 0, 1000000 + i * 10000, 2000);
}
+ csController->WaitCompactions(TDuration::Seconds(10));
auto tableClient = kikimr.GetTableClient();
{
@@ -77,8 +73,7 @@ Y_UNIT_TEST_SUITE(KqpOlapSysView) {
auto rows = ExecuteScanQuery(tableClient, selectQuery);
- UNIT_ASSERT_GT(rows.size(), 1*numKinds);
- UNIT_ASSERT_LE(rows.size(), 3*numKinds);
+ UNIT_ASSERT_VALUES_EQUAL(rows.size(), 3);
UNIT_ASSERT_VALUES_EQUAL(GetUint64(rows.front().at("PathId")), 3ull);
UNIT_ASSERT_VALUES_EQUAL(GetUint64(rows.back().at("PathId")), 3ull);
}
@@ -92,8 +87,7 @@ Y_UNIT_TEST_SUITE(KqpOlapSysView) {
auto rows = ExecuteScanQuery(tableClient, selectQuery);
- UNIT_ASSERT_GT(rows.size(), 1*numKinds);
- UNIT_ASSERT_LE(rows.size(), 3*numKinds);
+ UNIT_ASSERT_VALUES_EQUAL(rows.size(), 3);
UNIT_ASSERT_VALUES_EQUAL(GetUint64(rows.front().at("PathId")), 4ull);
UNIT_ASSERT_VALUES_EQUAL(GetUint64(rows.back().at("PathId")), 4ull);
}
@@ -116,8 +110,7 @@ Y_UNIT_TEST_SUITE(KqpOlapSysView) {
ui64 rawBytesPK1;
ui64 bytesPK1;
{
- auto csController = NYDBTest::TControllers::RegisterCSControllerGuard<NYDBTest::NColumnShard::TController>();
- csController->SetPeriodicWakeupActivationPeriod(TDuration::Seconds(1));
+ auto csController = NYDBTest::TControllers::RegisterCSControllerGuard<NOlap::TWaitCompactionController>();
auto settings = TKikimrSettings()
.SetWithSampleTables(false);
TKikimrRunner kikimr(settings);
@@ -125,10 +118,11 @@ Y_UNIT_TEST_SUITE(KqpOlapSysView) {
TTypedLocalHelper helper("", kikimr, "olapTable", "olapStore12");
helper.CreateTestOlapTable();
helper.FillPKOnly(0, 800000);
+ csController->WaitCompactions(TDuration::Seconds(10));
helper.GetVolumes(rawBytesPK1, bytesPK1, false);
}
- auto csController = NYDBTest::TControllers::RegisterCSControllerGuard<NYDBTest::NColumnShard::TController>();
+ auto csController = NYDBTest::TControllers::RegisterCSControllerGuard<NOlap::TWaitCompactionController>();
ui64 rawBytesUnpack1PK = 0;
ui64 bytesUnpack1PK = 0;
ui64 rawBytesPackAndUnpack2PK;
@@ -144,6 +138,7 @@ Y_UNIT_TEST_SUITE(KqpOlapSysView) {
helper.CreateTestOlapTable();
NArrow::NConstruction::TStringPoolFiller sPool(groupsCount, 52);
helper.FillTable(sPool, 0, rowsCount);
+ csController->WaitCompactions(TDuration::Seconds(10));
helper.PrintCount();
{
auto d = helper.GetDistribution();
@@ -152,14 +147,12 @@ Y_UNIT_TEST_SUITE(KqpOlapSysView) {
Y_ABORT_UNLESS(d.GetMaxCount() - d.GetMinCount() <= 1);
}
helper.GetVolumes(rawBytesUnpack1PK, bytesUnpack1PK, false);
- Sleep(TDuration::Seconds(5));
auto tableClient = kikimr.GetTableClient();
helper.ExecuteSchemeQuery(TStringBuilder() << "ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=ALTER_COLUMN, NAME=field, `ENCODING.DICTIONARY.ENABLED`=`true`);");
helper.ExecuteSchemeQuery("ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=ALTER_COLUMN, NAME=field1, `ENCODING.DICTIONARY.ENABLED`=`true`);", NYdb::EStatus::SCHEME_ERROR);
helper.ExecuteSchemeQuery("ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=ALTER_COLUMN, NAME=field, `ENCODING.DICTIONARY.ENABLED1`=`true`);", NYdb::EStatus::GENERIC_ERROR);
- Sleep(TDuration::Seconds(5));
helper.FillTable(sPool, 1, rowsCount);
- Sleep(TDuration::Seconds(5));
+ csController->WaitCompactions(TDuration::Seconds(10));
{
helper.GetVolumes(rawBytesPackAndUnpack2PK, bytesPackAndUnpack2PK, false);
helper.PrintCount();
@@ -172,6 +165,7 @@ Y_UNIT_TEST_SUITE(KqpOlapSysView) {
}
}
helper.ExecuteSchemeQuery("ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=ALTER_COLUMN, NAME=field, `SERIALIZER.CLASS_NAME`=`ARROW_SERIALIZER`, `COMPRESSION.TYPE`=`zstd`);");
+ csController->WaitCompactions(TDuration::Seconds(10));
}
const ui64 rawBytesUnpack = rawBytesUnpack1PK - rawBytesPK1;
const ui64 bytesUnpack = bytesUnpack1PK - bytesPK1;
@@ -190,8 +184,7 @@ Y_UNIT_TEST_SUITE(KqpOlapSysView) {
Y_UNIT_TEST(StatsSysViewBytesPackActualization) {
ui64 rawBytesPK1;
ui64 bytesPK1;
- auto csController = NYDBTest::TControllers::RegisterCSControllerGuard<NYDBTest::NColumnShard::TController>();
- csController->SetPeriodicWakeupActivationPeriod(TDuration::Seconds(1));
+ auto csController = NYDBTest::TControllers::RegisterCSControllerGuard<NOlap::TWaitCompactionController>();
auto settings = TKikimrSettings()
.SetWithSampleTables(false);
TKikimrRunner kikimr(settings);
@@ -199,6 +192,8 @@ Y_UNIT_TEST_SUITE(KqpOlapSysView) {
TTypedLocalHelper helper("", kikimr, "olapTable", "olapStore");
helper.CreateTestOlapTable();
helper.FillPKOnly(0, 800000);
+ csController->WaitCompactions(TDuration::Seconds(10));
+
helper.GetVolumes(rawBytesPK1, bytesPK1, false, {"pk_int"});
auto tableClient = kikimr.GetTableClient();
{
@@ -226,16 +221,16 @@ Y_UNIT_TEST_SUITE(KqpOlapSysView) {
Y_UNIT_TEST(StatsSysViewBytesColumnActualization) {
ui64 rawBytes1;
ui64 bytes1;
- auto csController = NYDBTest::TControllers::RegisterCSControllerGuard<NYDBTest::NColumnShard::TController>();
- csController->SetPeriodicWakeupActivationPeriod(TDuration::Seconds(1));
- auto settings = TKikimrSettings()
- .SetWithSampleTables(false);
+ auto csController = NYDBTest::TControllers::RegisterCSControllerGuard<NOlap::TWaitCompactionController>();
+ auto settings = TKikimrSettings().SetWithSampleTables(false);
TKikimrRunner kikimr(settings);
Tests::NCommon::TLoggerInit(kikimr).Initialize();
TTypedLocalHelper helper("Utf8", kikimr);
helper.CreateTestOlapTable();
NArrow::NConstruction::TStringPoolFiller sPool(3, 52);
helper.FillTable(sPool, 0, 800000);
+ csController->WaitCompactions(TDuration::Seconds(10));
+
helper.GetVolumes(rawBytes1, bytes1, false, {"new_column_ui64"});
AFL_VERIFY(rawBytes1 == 0);
AFL_VERIFY(bytes1 == 0);
@@ -255,8 +250,7 @@ Y_UNIT_TEST_SUITE(KqpOlapSysView) {
Y_UNIT_TEST(StatsSysViewBytesDictActualization) {
ui64 rawBytes1;
ui64 bytes1;
- auto csController = NYDBTest::TControllers::RegisterCSControllerGuard<NYDBTest::NColumnShard::TController>();
- csController->SetPeriodicWakeupActivationPeriod(TDuration::Seconds(1));
+ auto csController = NYDBTest::TControllers::RegisterCSControllerGuard<NOlap::TWaitCompactionController>();
auto settings = TKikimrSettings()
.SetWithSampleTables(false);
TKikimrRunner kikimr(settings);
@@ -265,6 +259,8 @@ Y_UNIT_TEST_SUITE(KqpOlapSysView) {
helper.CreateTestOlapTable();
NArrow::NConstruction::TStringPoolFiller sPool(3, 52);
helper.FillTable(sPool, 0, 800000);
+ csController->WaitCompactions(TDuration::Seconds(10));
+
helper.GetVolumes(rawBytes1, bytes1, false, {"field"});
auto tableClient = kikimr.GetTableClient();
{
@@ -292,8 +288,7 @@ Y_UNIT_TEST_SUITE(KqpOlapSysView) {
Y_UNIT_TEST(StatsSysViewBytesDictStatActualization) {
ui64 rawBytes1;
ui64 bytes1;
- auto csController = NYDBTest::TControllers::RegisterCSControllerGuard<NYDBTest::NColumnShard::TController>();
- csController->SetPeriodicWakeupActivationPeriod(TDuration::Seconds(1));
+ auto csController = NYDBTest::TControllers::RegisterCSControllerGuard<NOlap::TWaitCompactionController>();
auto settings = TKikimrSettings().SetWithSampleTables(false);
TKikimrRunner kikimr(settings);
Tests::NCommon::TLoggerInit(kikimr).Initialize();
@@ -301,6 +296,8 @@ Y_UNIT_TEST_SUITE(KqpOlapSysView) {
helper.CreateTestOlapTable();
NArrow::NConstruction::TStringPoolFiller sPool(3, 52);
helper.FillTable(sPool, 0, 800000);
+ csController->WaitCompactions(TDuration::Seconds(10));
+
helper.GetVolumes(rawBytes1, bytes1, false, {"field"});
auto tableClient = kikimr.GetTableClient();
{
@@ -308,52 +305,73 @@ Y_UNIT_TEST_SUITE(KqpOlapSysView) {
helper.ExecuteSchemeQuery("ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_STAT, NAME=field_var, TYPE=variability, FEATURES=`{\"column_name\" : \"field\"}`);");
helper.ExecuteSchemeQuery("ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_STAT, NAME=pk_int_max, TYPE=max, FEATURES=`{\"column_name\" : \"pk_int\"}`);");
helper.ExecuteSchemeQuery("ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_OPTIONS, SCHEME_NEED_ACTUALIZATION=`true`);");
- csController->WaitActualization(TDuration::Seconds(10));
- ui64 rawBytes2;
- ui64 bytes2;
- helper.GetVolumes(rawBytes2, bytes2, false, {"field"});
- AFL_VERIFY(rawBytes2 == rawBytes1)("f1", rawBytes1)("f2", rawBytes2);
- AFL_VERIFY(bytes2 < bytes1 * 0.5)("f1", bytes1)("f2", bytes2);
- std::vector<NKikimrColumnShardStatisticsProto::TPortionStorage> stats;
- helper.GetStats(stats, true);
- for (auto&& i : stats) {
- AFL_VERIFY(i.ScalarsSize() == 2);
- AFL_VERIFY(i.GetScalars()[0].GetUint32() == 3);
- }
+ csController->WaitCondition(TDuration::Seconds(10), [&]() {
+ ui64 rawBytes2;
+ ui64 bytes2;
+ helper.GetVolumes(rawBytes2, bytes2, false, {"field"});
+ AFL_VERIFY(rawBytes2 == rawBytes1)("f1", rawBytes1)("f2", rawBytes2);
+ AFL_VERIFY(bytes2 < bytes1 * 0.5)("f1", bytes1)("f2", bytes2);
+ std::vector<NKikimrColumnShardStatisticsProto::TPortionStorage> stats;
+ helper.GetStats(stats, true);
+ for (auto&& i : stats) {
+ if (i.ScalarsSize() != 2) {
+ return false;
+ }
+ if (i.GetScalars()[0].GetUint32() != 3) {
+ return false;
+ }
+ }
+ return true;
+ }
+ );
}
{
helper.ExecuteSchemeQuery("ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=DROP_STAT, NAME=pk_int_max);");
helper.ExecuteSchemeQuery("ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_OPTIONS, SCHEME_NEED_ACTUALIZATION=`true`);");
- csController->WaitActualization(TDuration::Seconds(10));
- std::vector<NKikimrColumnShardStatisticsProto::TPortionStorage> stats;
- helper.GetStats(stats, true);
- for (auto&& i : stats) {
- AFL_VERIFY(i.ScalarsSize() == 1);
- AFL_VERIFY(i.GetScalars()[0].GetUint32() == 3);
- }
+ csController->WaitCondition(TDuration::Seconds(10), [&]() {
+ std::vector<NKikimrColumnShardStatisticsProto::TPortionStorage> stats;
+ helper.GetStats(stats, true);
+ for (auto&& i : stats) {
+ if (i.ScalarsSize() != 1) {
+ return false;
+ }
+ if (i.GetScalars()[0].GetUint32() != 3) {
+ return false;
+ }
+ }
+ return true;
+ });
}
{
helper.ExecuteSchemeQuery("ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_STAT, NAME=pk_int_max, TYPE=max, FEATURES=`{\"column_name\" : \"pk_int\"}`);");
helper.ExecuteSchemeQuery("ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_OPTIONS, SCHEME_NEED_ACTUALIZATION=`true`);");
- csController->WaitActualization(TDuration::Seconds(10));
- std::vector<NKikimrColumnShardStatisticsProto::TPortionStorage> stats;
- helper.GetStats(stats, true);
- for (auto&& i : stats) {
- AFL_VERIFY(i.ScalarsSize() == 2);
- AFL_VERIFY(i.GetScalars()[0].GetUint32() == 3);
- }
+ csController->WaitCondition(TDuration::Seconds(10), [&]() {
+ std::vector<NKikimrColumnShardStatisticsProto::TPortionStorage> stats;
+ helper.GetStats(stats, true);
+ for (auto&& i : stats) {
+ if (i.ScalarsSize() != 2) {
+ return false;
+ }
+ if (i.GetScalars()[0].GetUint32() != 3) {
+ return false;
+ }
+ }
+ return true;
+ }
+ );
}
}
Y_UNIT_TEST(StatsSysViewColumns) {
auto settings = TKikimrSettings().SetWithSampleTables(false);
- auto csController = NYDBTest::TControllers::RegisterCSControllerGuard<NYDBTest::NColumnShard::TController>();
+ auto csController = NYDBTest::TControllers::RegisterCSControllerGuard<NOlap::TWaitCompactionController>();
TKikimrRunner kikimr(settings);
TLocalHelper(kikimr.GetTestServer()).CreateTestOlapTable();
for (ui64 i = 0; i < 10; ++i) {
- WriteTestData(kikimr, "/Root/olapStore/olapTable", 0, 1000000 + i*10000, 2000);
+ WriteTestData(kikimr, "/Root/olapStore/olapTable", 0, 1000000 + i * 10000, 2000);
}
+ csController->WaitCompactions(TDuration::Seconds(10));
auto tableClient = kikimr.GetTableClient();
@@ -388,16 +406,15 @@ Y_UNIT_TEST_SUITE(KqpOlapSysView) {
}
{
auto selectQuery = TString(R"(
- SELECT Sum(Rows) as Rows, Kind, Sum(RawBytes) as RawBytes, Sum(Rows) as Rows2, Sum(Rows) as Rows3, PathId
- FROM `/Root/olapStore/.sys/store_primary_index_stats`
+ SELECT Sum(Rows) as Rows, Kind, Sum(ColumnRawBytes) as RawBytes, PathId
+ FROM `/Root/olapStore/.sys/store_primary_index_portion_stats`
GROUP BY Kind, PathId
- ORDER BY PathId, Kind, Rows3
+ ORDER BY PathId, Kind, Rows
)");
auto rows = ExecuteScanQuery(tableClient, selectQuery);
- UNIT_ASSERT_VALUES_EQUAL(rows.size(), 2);
- UNIT_ASSERT_VALUES_EQUAL(GetUint64(rows[0].at("Rows2")), GetUint64(rows[0].at("Rows3")));
- UNIT_ASSERT_VALUES_EQUAL(GetUint64(rows[1].at("Rows")), GetUint64(rows[1].at("Rows3")));
+ UNIT_ASSERT_VALUES_EQUAL(rows.size(), 1);
+ UNIT_ASSERT_VALUES_EQUAL(GetUint64(rows[0].at("Rows")), 20000);
}
}
@@ -413,10 +430,11 @@ Y_UNIT_TEST_SUITE(KqpOlapSysView) {
TLocalHelper(kikimr).CreateTestOlapTable("olapTable_3");
for (ui64 i = 0; i < 10; ++i) {
- WriteTestData(kikimr, "/Root/olapStore/olapTable_1", 0, 1000000 + i*10000, 2000);
- WriteTestData(kikimr, "/Root/olapStore/olapTable_2", 0, 1000000 + i*10000, 3000);
- WriteTestData(kikimr, "/Root/olapStore/olapTable_3", 0, 1000000 + i*10000, 5000);
+ WriteTestData(kikimr, "/Root/olapStore/olapTable_1", 0, 1000000 + i * 10000, 2000);
+ WriteTestData(kikimr, "/Root/olapStore/olapTable_2", 0, 1000000 + i * 10000, 3000);
+ WriteTestData(kikimr, "/Root/olapStore/olapTable_3", 0, 1000000 + i * 10000, 5000);
}
+ csController->WaitCompactions(TDuration::Seconds(10));
auto tableClient = kikimr.GetTableClient();
@@ -461,12 +479,12 @@ Y_UNIT_TEST_SUITE(KqpOlapSysView) {
auto rows = ExecuteScanQuery(tableClient, selectQuery);
- ui32 numExpected = 3*3;
+ ui32 numExpected = 3 * 3;
UNIT_ASSERT_VALUES_EQUAL(rows.size(), numExpected);
UNIT_ASSERT_VALUES_EQUAL(GetUint64(rows[0].at("PathId")), 5ull);
UNIT_ASSERT_VALUES_EQUAL(GetUtf8(rows[0].at("Kind")), "INSERTED");
- UNIT_ASSERT_VALUES_EQUAL(GetUint64(rows[numExpected-1].at("PathId")), 3ull);
- UNIT_ASSERT_VALUES_EQUAL(GetUtf8(rows[numExpected-1].at("Kind")), "INSERTED");
+ UNIT_ASSERT_VALUES_EQUAL(GetUint64(rows[numExpected - 1].at("PathId")), 3ull);
+ UNIT_ASSERT_VALUES_EQUAL(GetUtf8(rows[numExpected - 1].at("Kind")), "INSERTED");
}
{
@@ -484,24 +502,25 @@ Y_UNIT_TEST_SUITE(KqpOlapSysView) {
auto rows = ExecuteScanQuery(tableClient, selectQuery);
- ui32 numExpected = 2*3;
+ ui32 numExpected = 2 * 3;
UNIT_ASSERT_VALUES_EQUAL(rows.size(), numExpected);
UNIT_ASSERT_VALUES_EQUAL(GetUint64(rows[0].at("PathId")), 5ull);
UNIT_ASSERT_VALUES_EQUAL(GetUtf8(rows[0].at("Kind")), "INSERTED");
- UNIT_ASSERT_VALUES_EQUAL(GetUint64(rows[numExpected-1].at("PathId")), 3ull);
- UNIT_ASSERT_VALUES_EQUAL(GetUtf8(rows[numExpected-1].at("Kind")), "INSERTED");
+ UNIT_ASSERT_VALUES_EQUAL(GetUint64(rows[numExpected - 1].at("PathId")), 3ull);
+ UNIT_ASSERT_VALUES_EQUAL(GetUtf8(rows[numExpected - 1].at("Kind")), "INSERTED");
}
}
Y_UNIT_TEST(StatsSysViewFilter) {
auto settings = TKikimrSettings().SetWithSampleTables(false);
TKikimrRunner kikimr(settings);
- auto csController = NYDBTest::TControllers::RegisterCSControllerGuard<NYDBTest::NColumnShard::TController>();
+ auto csController = NYDBTest::TControllers::RegisterCSControllerGuard<NOlap::TWaitCompactionController>();
TLocalHelper(kikimr.GetTestServer()).CreateTestOlapTable();
for (ui64 i = 0; i < 10; ++i) {
- WriteTestData(kikimr, "/Root/olapStore/olapTable", 0, 1000000 + i*10000, 2000);
+ WriteTestData(kikimr, "/Root/olapStore/olapTable", 0, 1000000 + i * 10000, 2000);
}
+ csController->WaitCompactions(TDuration::Seconds(10));
auto tableClient = kikimr.GetTableClient();
@@ -548,31 +567,32 @@ Y_UNIT_TEST_SUITE(KqpOlapSysView) {
auto selectQuery = TString(R"(
SELECT PathId, Kind, TabletId
FROM `/Root/olapStore/.sys/store_primary_index_stats`
- WHERE Kind IN ('SPLIT_COMPACTED', 'INACTIVE', 'EVICTED')
+ WHERE Kind IN ('SPLIT_COMPACTED', 'INACTIVE', 'EVICTED', 'INSERTED')
GROUP BY PathId, Kind, TabletId
ORDER BY PathId, Kind, TabletId;
)");
auto rows = ExecuteScanQuery(tableClient, selectQuery);
- UNIT_ASSERT_GE(rows.size(), 3);
+ UNIT_ASSERT_VALUES_EQUAL(rows.size(), 3);
}
}
Y_UNIT_TEST(StatsSysViewAggregation) {
auto settings = TKikimrSettings().SetWithSampleTables(false);
TKikimrRunner kikimr(settings);
- auto csController = NYDBTest::TControllers::RegisterCSControllerGuard<NYDBTest::NColumnShard::TController>();
+ auto csController = NYDBTest::TControllers::RegisterCSControllerGuard<NOlap::TWaitCompactionController>();
TLocalHelper(kikimr.GetTestServer()).CreateTestOlapTable("olapTable_1");
TLocalHelper(kikimr.GetTestServer()).CreateTestOlapTable("olapTable_2");
TLocalHelper(kikimr.GetTestServer()).CreateTestOlapTable("olapTable_3");
for (ui64 i = 0; i < 100; ++i) {
- WriteTestData(kikimr, "/Root/olapStore/olapTable_1", 0, 1000000 + i*10000, 1000);
- WriteTestData(kikimr, "/Root/olapStore/olapTable_2", 0, 1000000 + i*10000, 2000);
- WriteTestData(kikimr, "/Root/olapStore/olapTable_3", 0, 1000000 + i*10000, 3000);
+ WriteTestData(kikimr, "/Root/olapStore/olapTable_1", 0, 1000000 + i * 10000, 1000);
+ WriteTestData(kikimr, "/Root/olapStore/olapTable_2", 0, 1000000 + i * 10000, 2000);
+ WriteTestData(kikimr, "/Root/olapStore/olapTable_3", 0, 1000000 + i * 10000, 3000);
}
+ csController->WaitCompactions(TDuration::Seconds(10));
Tests::NCommon::TLoggerInit(kikimr).Initialize();
@@ -684,22 +704,22 @@ Y_UNIT_TEST_SUITE(KqpOlapSysView) {
auto rows = ExecuteScanQuery(tableClient, selectQuery);
// 3 Tables with 3 Shards each and 2 KindId-s of stats
- UNIT_ASSERT_VALUES_EQUAL(rows.size(), 3 * 3 * 2);
+ UNIT_ASSERT_VALUES_EQUAL(rows.size(), 3 * 3);
}
{
auto selectQuery = TString(R"(
SELECT
- count(distinct(PathId)),
- count(distinct(Kind)),
- count(distinct(TabletId))
+ count(distinct(PathId)) as PathsCount,
+ count(distinct(Kind)) as KindsCount,
+ count(distinct(TabletId)) as TabletsCount
FROM `/Root/olapStore/.sys/store_primary_index_stats`
)");
auto rows = ExecuteScanQuery(tableClient, selectQuery);
- UNIT_ASSERT_VALUES_EQUAL(GetUint64(rows[0].at("column0")), 3ull);
- UNIT_ASSERT_VALUES_EQUAL(GetUint64(rows[0].at("column1")), 2);
- UNIT_ASSERT_GE(GetUint64(rows[0].at("column2")), 3ull);
+ UNIT_ASSERT_VALUES_EQUAL(GetUint64(rows[0].at("PathsCount")), 3ull);
+ UNIT_ASSERT_VALUES_EQUAL(GetUint64(rows[0].at("KindsCount")), 1);
+ UNIT_ASSERT_VALUES_EQUAL(GetUint64(rows[0].at("TabletsCount")), 4);
}
{
diff --git a/ydb/core/kqp/ut/olap/ya.make b/ydb/core/kqp/ut/olap/ya.make
index f359312ebdb..352b7ac78f7 100644
--- a/ydb/core/kqp/ut/olap/ya.make
+++ b/ydb/core/kqp/ut/olap/ya.make
@@ -29,6 +29,8 @@ PEERDIR(
ydb/core/kqp/ut/common
ydb/library/yql/sql/pg_dummy
ydb/core/tx/columnshard/hooks/testing
+ ydb/core/tx/columnshard/test_helper
+ ydb/core/tx/columnshard
ydb/core/kqp/ut/olap/helpers
ydb/core/tx/datashard/ut_common
)
diff --git a/ydb/core/kqp/ut/scheme/kqp_constraints_ut.cpp b/ydb/core/kqp/ut/scheme/kqp_constraints_ut.cpp
index 46e25ebdad9..0138de4bf61 100644
--- a/ydb/core/kqp/ut/scheme/kqp_constraints_ut.cpp
+++ b/ydb/core/kqp/ut/scheme/kqp_constraints_ut.cpp
@@ -1,7 +1,8 @@
#include <ydb/core/kqp/ut/common/kqp_ut_common.h>
#include <ydb/core/formats/arrow/arrow_helpers.h>
+#include <ydb/core/tx/columnshard/test_helper/columnshard_ut_common.h>
+#include <ydb/core/tx/datashard/datashard.h>
#include <ydb/core/tx/tx_proxy/proxy.h>
-#include <ydb/core/tx/columnshard/columnshard_ut_common.h>
#include <ydb/public/sdk/cpp/client/ydb_proto/accessor.h>
#include <ydb/public/sdk/cpp/client/ydb_scheme/scheme.h>
#include <ydb/public/sdk/cpp/client/ydb_topic/topic.h>
diff --git a/ydb/core/tx/columnshard/background_controller.h b/ydb/core/tx/columnshard/background_controller.h
index b88e1be3bce..bb38f274406 100644
--- a/ydb/core/tx/columnshard/background_controller.h
+++ b/ydb/core/tx/columnshard/background_controller.h
@@ -17,6 +17,7 @@ private:
bool ActiveCleanupPortions = false;
bool ActiveCleanupTables = false;
+ bool ActiveCleanupInsertTable = false;
YDB_READONLY(TMonotonic, LastIndexationInstant, TMonotonic::Zero());
public:
THashSet<NOlap::TPortionAddress> GetConflictTTLPortions() const;
@@ -66,6 +67,18 @@ public:
bool IsCleanupTablesActive() const {
return ActiveCleanupTables;
}
+
+ void StartCleanupInsertTable() {
+ Y_ABORT_UNLESS(!ActiveCleanupInsertTable);
+ ActiveCleanupInsertTable = true;
+ }
+ void FinishCleanupInsertTable() {
+ Y_ABORT_UNLESS(ActiveCleanupInsertTable);
+ ActiveCleanupInsertTable = false;
+ }
+ bool IsCleanupInsertTableActive() const {
+ return ActiveCleanupInsertTable;
+ }
};
}
diff --git a/ydb/core/tx/columnshard/blobs_action/transaction/tx_gc_insert_table.cpp b/ydb/core/tx/columnshard/blobs_action/transaction/tx_gc_insert_table.cpp
index 1571564621d..20507a0c44f 100644
--- a/ydb/core/tx/columnshard/blobs_action/transaction/tx_gc_insert_table.cpp
+++ b/ydb/core/tx/columnshard/blobs_action/transaction/tx_gc_insert_table.cpp
@@ -30,6 +30,7 @@ void TTxInsertTableCleanup::Complete(const TActorContext& /*ctx*/) {
Y_ABORT_UNLESS(BlobsAction);
BlobsAction->OnCompleteTxAfterRemoving(*Self, true);
+ Self->BackgroundController.FinishCleanupInsertTable();
Self->EnqueueBackgroundActivities();
}
diff --git a/ydb/core/tx/columnshard/blobs_action/transaction/tx_write.cpp b/ydb/core/tx/columnshard/blobs_action/transaction/tx_write.cpp
index f4bd0b23037..7b9a74b7885 100644
--- a/ydb/core/tx/columnshard/blobs_action/transaction/tx_write.cpp
+++ b/ydb/core/tx/columnshard/blobs_action/transaction/tx_write.cpp
@@ -130,6 +130,7 @@ void TTxWrite::Complete(const TActorContext& ctx) {
const auto& writeMeta = buffer.GetAggregations()[i]->GetWriteData()->GetWriteMeta();
auto operation = Self->OperationsManager->GetOperation((TWriteId)writeMeta.GetWriteId());
if (operation) {
+ CompleteTransaction(operation->GetLockId(), ctx);
ctx.Send(writeMeta.GetSource(), Results[i].release(), 0, operation->GetCookie());
} else {
ctx.Send(writeMeta.GetSource(), Results[i].release());
diff --git a/ydb/core/tx/columnshard/columnshard__progress_tx.cpp b/ydb/core/tx/columnshard/columnshard__progress_tx.cpp
index 7d0478aa759..6db95153b6d 100644
--- a/ydb/core/tx/columnshard/columnshard__progress_tx.cpp
+++ b/ydb/core/tx/columnshard/columnshard__progress_tx.cpp
@@ -41,14 +41,8 @@ public:
}
TxOperator = Self->ProgressTxController->GetVerifiedTxOperator(txId);
- AFL_VERIFY(TxOperator->Progress(*Self, NOlap::TSnapshot(step, txId), txc));
+ AFL_VERIFY(TxOperator->ExecuteOnProgress(*Self, NOlap::TSnapshot(step, txId), txc));
Self->ProgressTxController->FinishPlannedTx(txId, txc);
- Self->RescheduleWaitingReads();
- }
-
- Self->ProgressTxInFlight = false;
- if (!!Self->ProgressTxController->GetPlannedTx()) {
- Self->EnqueueProgressTx(ctx);
}
return true;
}
@@ -56,7 +50,8 @@ public:
void Complete(const TActorContext& ctx) override {
NActors::TLogContextGuard logGuard = NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("tablet_id", Self->TabletID())("tx_state", "complete");
if (TxOperator) {
- TxOperator->Complete(*Self, ctx);
+ TxOperator->CompleteOnProgress(*Self, ctx);
+ Self->RescheduleWaitingReads();
}
if (PlannedQueueItem) {
Self->GetProgressTxController().CompleteRunningTx(*PlannedQueueItem);
@@ -64,11 +59,15 @@ public:
if (LastCompletedTx) {
Self->LastCompletedTx = std::max(*LastCompletedTx, Self->LastCompletedTx);
}
- Self->SetupIndexation();
+ Self->ProgressTxInFlight = false;
+ if (!!Self->ProgressTxController->GetPlannedTx()) {
+ Self->EnqueueProgressTx(ctx);
+ }
+ Self->EnqueueBackgroundActivities(false);
}
private:
- TTxController::ITransactionOperatior::TPtr TxOperator;
+ TTxController::ITransactionOperator::TPtr TxOperator;
const ui32 TabletTxNo;
std::optional<NOlap::TSnapshot> LastCompletedTx;
std::optional<TTxController::TPlanQueueItem> PlannedQueueItem;
diff --git a/ydb/core/tx/columnshard/columnshard__propose_cancel.cpp b/ydb/core/tx/columnshard/columnshard__propose_cancel.cpp
index 9b9ed721cc0..d3da820725a 100644
--- a/ydb/core/tx/columnshard/columnshard__propose_cancel.cpp
+++ b/ydb/core/tx/columnshard/columnshard__propose_cancel.cpp
@@ -19,12 +19,15 @@ public:
const auto* msg = Ev->Get();
const ui64 txId = msg->Record.GetTxId();
- Self->ProgressTxController->CancelTx(txId, txc);
+ Self->ProgressTxController->ExecuteOnCancel(txId, txc);
return true;
}
- void Complete(const TActorContext&) override {
+ void Complete(const TActorContext& ctx) override {
LOG_S_DEBUG("TTxProposeCancel.Complete");
+ const auto* msg = Ev->Get();
+ const ui64 txId = msg->Record.GetTxId();
+ Self->ProgressTxController->CompleteOnCancel(txId, ctx);
}
private:
diff --git a/ydb/core/tx/columnshard/columnshard__propose_transaction.cpp b/ydb/core/tx/columnshard/columnshard__propose_transaction.cpp
index 3688e74126e..9c0265ceb6d 100644
--- a/ydb/core/tx/columnshard/columnshard__propose_transaction.cpp
+++ b/ydb/core/tx/columnshard/columnshard__propose_transaction.cpp
@@ -132,8 +132,12 @@ void TTxProposeTransaction::OnProposeResult(TTxController::TProposeResult& propo
}
void TTxProposeTransaction::Complete(const TActorContext& ctx) {
+ auto& record = Proto(Ev->Get());
+ const ui64 txId = record.GetTxId();
+
Y_ABORT_UNLESS(Ev);
Y_ABORT_UNLESS(Result);
+ CompleteTransaction(txId, ctx);
ctx.Send(Ev->Get()->GetSource(), Result.release());
Self->TryRegisterMediatorTimeCast();
}
diff --git a/ydb/core/tx/columnshard/columnshard_impl.cpp b/ydb/core/tx/columnshard/columnshard_impl.cpp
index b04af261d98..14d35d75704 100644
--- a/ydb/core/tx/columnshard/columnshard_impl.cpp
+++ b/ydb/core/tx/columnshard/columnshard_impl.cpp
@@ -521,7 +521,7 @@ void TColumnShard::EnqueueBackgroundActivities(const bool periodic) {
SendPeriodicStats();
if (!TablesManager.HasPrimaryIndex()) {
- LOG_S_NOTICE("Background activities cannot be started: no index at tablet " << TabletID());
+ AFL_NOTICE(NKikimrServices::TX_COLUMNSHARD)("problem", "Background activities cannot be started: no index at tablet");
return;
}
// !!!!!! MUST BE FIRST THROUGH DATA HAVE TO BE SAME IN SESSIONS AFTER TABLET RESTART
@@ -855,11 +855,16 @@ void TColumnShard::Handle(TEvPrivate::TEvGarbageCollectionFinished::TPtr& ev, co
void TColumnShard::SetupCleanupInsertTable() {
auto writeIdsToCleanup = InsertTable->OldWritesToAbort(AppData()->TimeProvider->Now());
+ if (BackgroundController.IsCleanupInsertTableActive()) {
+ ACFL_DEBUG("background", "cleanup_insert_table")("skip_reason", "in_progress");
+ return;
+ }
+
if (!InsertTable->GetAborted().size() && !writeIdsToCleanup.size()) {
return;
}
AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("event", "cleanup_started")("aborted", InsertTable->GetAborted().size())("to_cleanup", writeIdsToCleanup.size());
-
+ BackgroundController.StartCleanupInsertTable();
Execute(new TTxInsertTableCleanup(this, std::move(writeIdsToCleanup)), TActorContext::AsActorContext());
}
diff --git a/ydb/core/tx/columnshard/engines/storage/granule.h b/ydb/core/tx/columnshard/engines/storage/granule.h
index 1002139fbf3..35857032f8f 100644
--- a/ydb/core/tx/columnshard/engines/storage/granule.h
+++ b/ydb/core/tx/columnshard/engines/storage/granule.h
@@ -5,6 +5,7 @@
#include <ydb/core/tx/columnshard/counters/engine_logs.h>
#include <ydb/core/tx/columnshard/engines/column_engine.h>
#include <ydb/core/tx/columnshard/engines/portion_info.h>
+#include <ydb/core/tx/columnshard/hooks/abstract/abstract.h>
#include <ydb/core/base/appdata.h>
#include <ydb/core/formats/arrow/reader/position.h>
@@ -244,7 +245,7 @@ public:
}
void ActualizeOptimizer(const TInstant currentInstant) const {
- if (currentInstant - OptimizerPlanner->GetActualizationInstant() > TDuration::Seconds(1)) {
+ if (currentInstant - OptimizerPlanner->GetActualizationInstant() >= NYDBTest::TControllers::GetColumnShardController()->GetCompactionActualizationLag(TDuration::Seconds(1))) {
OptimizerPlanner->Actualize(currentInstant);
}
}
diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/lbuckets/optimizer.h b/ydb/core/tx/columnshard/engines/storage/optimizer/lbuckets/optimizer.h
index 148dd5da4a2..40f44d6e51a 100644
--- a/ydb/core/tx/columnshard/engines/storage/optimizer/lbuckets/optimizer.h
+++ b/ydb/core/tx/columnshard/engines/storage/optimizer/lbuckets/optimizer.h
@@ -581,15 +581,15 @@ public:
return 0;
}
*/
-
- const ui64 count = BucketInfo.GetCount() + ((mainPortion && !isFinal) ? 1 : 0);
+ const bool isForce = NYDBTest::TControllers::GetColumnShardController()->GetCompactionControl() == NYDBTest::EOptimizerCompactionWeightControl::Force;
+ const ui64 count = BucketInfo.GetCount() + ((mainPortion && (!isFinal || isForce)) ? 1 : 0);
const ui64 recordsCount = BucketInfo.GetRecordsCount() + ((mainPortion && !isFinal) ? mainPortion->GetRecordsCount() : 0);
const ui64 sumBytes = BucketInfo.GetBytes() + ((mainPortion && !isFinal) ? mainPortion->GetTotalBlobBytes() : 0);
if (NYDBTest::TControllers::GetColumnShardController()->GetCompactionControl() == NYDBTest::EOptimizerCompactionWeightControl::Disable) {
return 0;
}
const ui64 weight = (10000000000.0 * count - sumBytes) * (isFinal ? 1 : 10);
- if (NYDBTest::TControllers::GetColumnShardController()->GetCompactionControl() == NYDBTest::EOptimizerCompactionWeightControl::Force) {
+ if (isForce) {
return (count > 1) ? weight : 0;
}
@@ -799,10 +799,14 @@ public:
}
} else {
if (MainPortion) {
- for (auto&& i : portions) {
- if (MainPortion->CrossPKWith(*i)) {
- portions.emplace_back(MainPortion);
- break;
+ if (portions.size() == 1) {
+ portions.emplace_back(MainPortion);
+ } else {
+ for (auto&& i : portions) {
+ if (MainPortion->CrossPKWith(*i)) {
+ portions.emplace_back(MainPortion);
+ break;
+ }
}
}
}
@@ -811,7 +815,7 @@ public:
stopInstant = Others.GetFutureStartInstant();
}
}
- AFL_VERIFY(portions.size() > 1);
+ AFL_VERIFY(portions.size() > 1)("size", portions.size());
ui64 size = 0;
for (auto&& i : portions) {
size += i->GetTotalBlobBytes();
diff --git a/ydb/core/tx/columnshard/engines/ut/ut_logs_engine.cpp b/ydb/core/tx/columnshard/engines/ut/ut_logs_engine.cpp
index 9a20c83958c..fc4607fb623 100644
--- a/ydb/core/tx/columnshard/engines/ut/ut_logs_engine.cpp
+++ b/ydb/core/tx/columnshard/engines/ut/ut_logs_engine.cpp
@@ -6,7 +6,7 @@
#include <ydb/core/tx/columnshard/engines/changes/indexation.h>
#include <ydb/core/tx/columnshard/engines/changes/ttl.h>
-#include <ydb/core/tx/columnshard/columnshard_ut_common.h>
+#include <ydb/core/tx/columnshard/test_helper/columnshard_ut_common.h>
#include <ydb/core/tx/columnshard/engines/changes/compaction.h>
#include <ydb/core/tx/columnshard/engines/portions/write_with_blobs.h>
#include <ydb/core/tx/columnshard/blobs_action/bs/storage.h>
@@ -16,6 +16,8 @@
#include <ydb/core/tx/columnshard/background_controller.h>
#include <ydb/core/tx/columnshard/engines/changes/abstract/abstract.h>
#include <ydb/core/tx/columnshard/test_helper/helper.h>
+#include <ydb/core/tx/columnshard/engines/insert_table/insert_table.h>
+#include <ydb/core/tx/columnshard/columnshard_schema.h>
namespace NKikimr {
diff --git a/ydb/core/tx/columnshard/engines/ut/ut_program.cpp b/ydb/core/tx/columnshard/engines/ut/ut_program.cpp
index a4586c48946..798ba6ec505 100644
--- a/ydb/core/tx/columnshard/engines/ut/ut_program.cpp
+++ b/ydb/core/tx/columnshard/engines/ut/ut_program.cpp
@@ -1,7 +1,7 @@
#include <ydb/core/tx/columnshard/engines/index_info.h>
#include <ydb/core/tx/columnshard/engines/reader/plain_reader/constructor/resolver.h>
-#include <ydb/core/tx/columnshard/columnshard_ut_common.h>
+#include <ydb/core/tx/columnshard/test_helper/columnshard_ut_common.h>
#include <ydb/core/tx/columnshard/test_helper/helper.h>
#include <ydb/core/tx/program/program.h>
#include <ydb/core/formats/arrow/converter.h>
diff --git a/ydb/core/tx/columnshard/engines/ut/ya.make b/ydb/core/tx/columnshard/engines/ut/ya.make
index 41a7d7b2aac..44ab1190e1e 100644
--- a/ydb/core/tx/columnshard/engines/ut/ya.make
+++ b/ydb/core/tx/columnshard/engines/ut/ya.make
@@ -37,7 +37,6 @@ SRCS(
ut_logs_engine.cpp
ut_program.cpp
helper.cpp
- ydb/core/tx/columnshard/columnshard_ut_common.cpp
)
END()
diff --git a/ydb/core/tx/columnshard/hooks/abstract/abstract.h b/ydb/core/tx/columnshard/hooks/abstract/abstract.h
index 7a64cc5a816..b1c13dbe04b 100644
--- a/ydb/core/tx/columnshard/hooks/abstract/abstract.h
+++ b/ydb/core/tx/columnshard/hooks/abstract/abstract.h
@@ -88,6 +88,10 @@ public:
using TPtr = std::shared_ptr<ICSController>;
virtual ~ICSController() = default;
+ virtual TDuration GetCompactionActualizationLag(const TDuration def) const {
+ return def;
+ }
+
virtual NColumnShard::TBlobPutResult::TPtr OverrideBlobPutResultOnCompaction(const NColumnShard::TBlobPutResult::TPtr original, const NOlap::TWriteActionsCollection& /*actions*/) const {
return original;
}
diff --git a/ydb/core/tx/columnshard/hooks/testing/controller.h b/ydb/core/tx/columnshard/hooks/testing/controller.h
index 9ba5865cdcd..1fcd157118c 100644
--- a/ydb/core/tx/columnshard/hooks/testing/controller.h
+++ b/ydb/core/tx/columnshard/hooks/testing/controller.h
@@ -206,6 +206,62 @@ protected:
}
public:
+ void WaitCompactions(const TDuration d) const {
+ TInstant start = TInstant::Now();
+ ui32 compactionsStart = GetCompactionStartedCounter().Val();
+ while (Now() - start < d) {
+ if (compactionsStart != GetCompactionStartedCounter().Val()) {
+ compactionsStart = GetCompactionStartedCounter().Val();
+ start = TInstant::Now();
+ }
+ Cerr << "WAIT_COMPACTION: " << GetCompactionStartedCounter().Val() << Endl;
+ Sleep(TDuration::Seconds(1));
+ }
+ }
+
+ void WaitIndexation(const TDuration d) const {
+ TInstant start = TInstant::Now();
+ ui32 compactionsStart = GetInsertStartedCounter().Val();
+ while (Now() - start < d) {
+ if (compactionsStart != GetInsertStartedCounter().Val()) {
+ compactionsStart = GetInsertStartedCounter().Val();
+ start = TInstant::Now();
+ }
+ Cerr << "WAIT_INDEXATION: " << GetInsertStartedCounter().Val() << Endl;
+ Sleep(TDuration::Seconds(1));
+ }
+ }
+
+ template <class TTester>
+ void WaitCondition(const TDuration d, const TTester& test) const {
+ const TInstant start = TInstant::Now();
+ while (TInstant::Now() - start < d) {
+ if (test()) {
+ Cerr << "condition SUCCESS!!..." << TInstant::Now() - start << Endl;
+ return;
+ } else {
+ Cerr << "waiting condition..." << TInstant::Now() - start << Endl;
+ Sleep(TDuration::Seconds(1));
+ }
+ }
+ AFL_VERIFY(false)("reason", "condition not reached");
+ }
+
+ void WaitActualization(const TDuration d) const {
+ TInstant start = TInstant::Now();
+ const i64 startVal = NeedActualizationCount.Val();
+ i64 predVal = NeedActualizationCount.Val();
+ while (TInstant::Now() - start < d && (!startVal || NeedActualizationCount.Val())) {
+ Cerr << "waiting actualization: " << NeedActualizationCount.Val() << "/" << TInstant::Now() - start << Endl;
+ if (NeedActualizationCount.Val() != predVal) {
+ predVal = NeedActualizationCount.Val();
+ start = TInstant::Now();
+ }
+ Sleep(TDuration::Seconds(1));
+ }
+ AFL_VERIFY(!NeedActualizationCount.Val());
+ }
+
virtual TDuration GetRemovedPortionLivetime(const TDuration /*def*/) const override {
return TDuration::Zero();
}
@@ -240,15 +296,6 @@ public:
DisabledBackgrounds.erase(id);
}
- void WaitActualization(const TDuration d) const {
- const TInstant start = TInstant::Now();
- while (TInstant::Now() - start < d && NeedActualizationCount.Val()) {
- Cerr << "waiting actualization: " << NeedActualizationCount.Val() << "/" << TInstant::Now() - start << Endl;
- Sleep(TDuration::Seconds(1));
- }
- AFL_VERIFY(!NeedActualizationCount.Val());
- }
-
std::vector<ui64> GetShardActualIds() const {
TGuard<TMutex> g(Mutex);
std::vector<ui64> result;
diff --git a/ydb/core/tx/columnshard/columnshard_ut_common.cpp b/ydb/core/tx/columnshard/test_helper/columnshard_ut_common.cpp
index 8c532e01223..a37342c4f72 100644
--- a/ydb/core/tx/columnshard/columnshard_ut_common.cpp
+++ b/ydb/core/tx/columnshard/test_helper/columnshard_ut_common.cpp
@@ -1,7 +1,7 @@
#include "columnshard_ut_common.h"
-#include "common/tests/shard_reader.h"
-#include "engines/reader/sys_view/chunks/chunks.h"
+#include <ydb/core/tx/columnshard/common/tests/shard_reader.h>
+#include <ydb/core/tx/columnshard/engines/reader/sys_view/portions/portions.h>
#include <ydb/core/base/tablet.h>
#include <ydb/core/base/tablet_resolver.h>
@@ -170,11 +170,11 @@ void ScanIndexStats(TTestBasicRuntime& runtime, TActorId& sender, const std::vec
record.SetTxId(snap.GetPlanStep());
record.SetScanId(scanId);
// record.SetLocalPathId(0);
- record.SetTablePath(NOlap::TIndexInfo::STORE_INDEX_STATS_TABLE);
+ record.SetTablePath(NOlap::TIndexInfo::STORE_INDEX_PORTION_STATS_TABLE);
// Schema: pathId, kind, rows, bytes, rawBytes. PK: {pathId, kind}
//record.SetSchemaVersion(0);
- auto ydbSchema = NOlap::NReader::NSysView::NChunks::TStatsIterator::StatsSchema;
+ auto ydbSchema = NOlap::NReader::NSysView::NPortions::TStatsIterator::StatsSchema;
for (const auto& col : ydbSchema.Columns) {
record.AddColumnTags(col.second.Id);
auto columnType = NScheme::ProtoColumnTypeFromTypeInfoMod(col.second.PType, col.second.PTypeMod);
@@ -224,6 +224,11 @@ void PlanCommit(TTestBasicRuntime& runtime, TActorId& sender, ui64 planStep, con
PlanCommit(runtime, sender, TTestTxConfig::TxTablet0, planStep, txIds);
}
+void Wakeup(TTestBasicRuntime& runtime, TActorId& sender, const ui64 shardId) {
+ auto wakeup = std::make_unique<TEvPrivate::TEvPeriodicWakeup>(true);
+ ForwardToTablet(runtime, shardId, sender, wakeup.release());
+}
+
void PlanCommit(TTestBasicRuntime& runtime, TActorId& sender, ui64 shardId, ui64 planStep, const TSet<ui64>& txIds) {
auto plan = std::make_unique<TEvTxProcessing::TEvPlanStep>(planStep, 0, shardId);
for (ui64 txId : txIds) {
@@ -243,6 +248,7 @@ void PlanCommit(TTestBasicRuntime& runtime, TActorId& sender, ui64 shardId, ui64
UNIT_ASSERT(txIds.contains(res.GetTxId()));
UNIT_ASSERT_EQUAL(res.GetStatus(), NKikimrTxColumnShard::EResultStatus::SUCCESS);
}
+ Wakeup(runtime, sender, shardId);
}
TCell MakeTestCell(const TTypeInfo& typeInfo, ui32 value, std::vector<TString>& mem) {
diff --git a/ydb/core/tx/columnshard/columnshard_ut_common.h b/ydb/core/tx/columnshard/test_helper/columnshard_ut_common.h
index 8feec369ea2..c696d3b940c 100644
--- a/ydb/core/tx/columnshard/columnshard_ut_common.h
+++ b/ydb/core/tx/columnshard/test_helper/columnshard_ut_common.h
@@ -1,9 +1,8 @@
#pragma once
-#include "columnshard.h"
-#include "columnshard_impl.h"
-#include "blob_cache.h"
-#include "engines/scheme/statistics/max/operator.h"
+#include <ydb/core/tx/columnshard/blob_cache.h>
+#include <ydb/core/tx/columnshard/engines/scheme/statistics/max/operator.h>
+#include <ydb/core/tx/columnshard/common/snapshot.h>
#include <ydb/core/formats/arrow/arrow_batch_builder.h>
#include <ydb/core/tx/columnshard/test_helper/helper.h>
@@ -11,7 +10,15 @@
#include <ydb/core/scheme/scheme_types_proto.h>
#include <ydb/core/testlib/tablet_helpers.h>
#include <ydb/core/testlib/test_client.h>
+#include <ydb/core/protos/tx_columnshard.pb.h>
+#include <ydb/services/metadata/abstract/fetcher.h>
+
#include <library/cpp/testing/unittest/registar.h>
+#include <ydb/core/tx/long_tx_service/public/types.h>
+
+namespace NKikimr::NOlap {
+struct TIndexInfo;
+}
namespace NKikimr::NTxUT {
@@ -449,6 +456,8 @@ inline void PlanCommit(TTestBasicRuntime& runtime, TActorId& sender, ui64 planSt
PlanCommit(runtime, sender, planStep, ids);
}
+void Wakeup(TTestBasicRuntime& runtime, TActorId& sender, const ui64 shardId);
+
struct TTestBlobOptions {
THashSet<TString> NullColumns;
THashSet<TString> SameValueColumns;
diff --git a/ydb/core/tx/columnshard/test_helper/controllers.cpp b/ydb/core/tx/columnshard/test_helper/controllers.cpp
index d9ee86446c6..997a700d901 100644
--- a/ydb/core/tx/columnshard/test_helper/controllers.cpp
+++ b/ydb/core/tx/columnshard/test_helper/controllers.cpp
@@ -1,7 +1,7 @@
+#include "columnshard_ut_common.h"
#include "controllers.h"
#include <ydb/core/tx/columnshard/engines/changes/ttl.h>
#include <ydb/core/tx/columnshard/engines/changes/indexation.h>
-#include <ydb/core/tx/columnshard/columnshard_ut_common.h>
namespace NKikimr::NOlap {
diff --git a/ydb/core/tx/columnshard/test_helper/controllers.h b/ydb/core/tx/columnshard/test_helper/controllers.h
index 682f2dcacaf..b18c2bc34e8 100644
--- a/ydb/core/tx/columnshard/test_helper/controllers.h
+++ b/ydb/core/tx/columnshard/test_helper/controllers.h
@@ -12,6 +12,7 @@ private:
ui32 TiersModificationsCount = 0;
YDB_READONLY(TAtomicCounter, StatisticsUsageCount, 0);
YDB_READONLY(TAtomicCounter, MaxValueUsageCount, 0);
+ YDB_ACCESSOR_DEF(std::optional<ui64>, SmallSizeDetector);
protected:
virtual void OnTieringModified(const std::shared_ptr<NKikimr::NColumnShard::TTiersManager>& /*tiers*/) override;
virtual void OnExportFinished() override {
@@ -21,7 +22,7 @@ protected:
return true;
}
virtual ui64 GetSmallPortionSizeDetector(const ui64 /*def*/) const override {
- return 0;
+ return SmallSizeDetector.value_or(0);
}
virtual TDuration GetOptimizerFreshnessCheckDuration(const TDuration /*defaultValue*/) const override {
return TDuration::Zero();
@@ -29,10 +30,17 @@ protected:
virtual TDuration GetLagForCompactionBeforeTierings(const TDuration /*def*/) const override {
return TDuration::Zero();
}
+ virtual TDuration GetCompactionActualizationLag(const TDuration /*def*/) const override {
+ return TDuration::Zero();
+ }
virtual TDuration GetTTLDefaultWaitingDuration(const TDuration /*defaultValue*/) const override {
return TDuration::Seconds(1);
}
public:
+ TWaitCompactionController() {
+ SetPeriodicWakeupActivationPeriod(TDuration::Seconds(1));
+ }
+
ui32 GetFinishedExportsCount() const {
return ExportsFinishedCount.Val();
}
diff --git a/ydb/core/tx/columnshard/test_helper/ya.make b/ydb/core/tx/columnshard/test_helper/ya.make
index 2083768b46e..b75f43de54f 100644
--- a/ydb/core/tx/columnshard/test_helper/ya.make
+++ b/ydb/core/tx/columnshard/test_helper/ya.make
@@ -13,6 +13,7 @@ PEERDIR(
SRCS(
helper.cpp
controllers.cpp
+ columnshard_ut_common.cpp
)
YQL_LAST_ABI_VERSION()
diff --git a/ydb/core/tx/columnshard/transactions/operators/backup.cpp b/ydb/core/tx/columnshard/transactions/operators/backup.cpp
index f55d2b82b0b..232aac166da 100644
--- a/ydb/core/tx/columnshard/transactions/operators/backup.cpp
+++ b/ydb/core/tx/columnshard/transactions/operators/backup.cpp
@@ -4,7 +4,7 @@
namespace NKikimr::NColumnShard {
-bool TBackupTransactionOperator::Parse(const TString& data) {
+bool TBackupTransactionOperator::Parse(TColumnShard& /*owner*/, const TString& data) {
NKikimrTxColumnShard::TBackupTxBody txBody;
if (!txBody.ParseFromString(data)) {
return false;
@@ -32,7 +32,7 @@ bool TBackupTransactionOperator::Parse(const TString& data) {
return true;
}
-TBackupTransactionOperator::TProposeResult TBackupTransactionOperator::Propose(TColumnShard& owner, NTabletFlatExecutor::TTransactionContext& /*txc*/, bool /*proposed*/) const {
+TBackupTransactionOperator::TProposeResult TBackupTransactionOperator::ExecuteOnPropose(TColumnShard& owner, NTabletFlatExecutor::TTransactionContext& /*txc*/) const {
auto proposition = owner.GetExportsManager()->ProposeTask(ExportTask);
if (!proposition) {
return TProposeResult(NKikimrTxColumnShard::EResultStatus::ERROR,
@@ -41,14 +41,14 @@ TBackupTransactionOperator::TProposeResult TBackupTransactionOperator::Propose(T
return TProposeResult();
}
-bool TBackupTransactionOperator::Progress(TColumnShard& owner, const NOlap::TSnapshot& version, NTabletFlatExecutor::TTransactionContext& txc) {
+bool TBackupTransactionOperator::ExecuteOnProgress(TColumnShard& owner, const NOlap::TSnapshot& version, NTabletFlatExecutor::TTransactionContext& txc) {
Y_UNUSED(version);
AFL_VERIFY(ExportTask);
owner.GetExportsManager()->ConfirmSessionOnExecute(ExportTask->GetIdentifier(), txc);
return true;
}
-bool TBackupTransactionOperator::Complete(TColumnShard& owner, const TActorContext& ctx) {
+bool TBackupTransactionOperator::CompleteOnProgress(TColumnShard& owner, const TActorContext& ctx) {
AFL_VERIFY(ExportTask);
owner.GetExportsManager()->ConfirmSessionOnComplete(ExportTask->GetIdentifier());
auto result = std::make_unique<TEvColumnShard::TEvProposeTransactionResult>(
@@ -59,7 +59,7 @@ bool TBackupTransactionOperator::Complete(TColumnShard& owner, const TActorConte
return true;
}
-bool TBackupTransactionOperator::Abort(TColumnShard& owner, NTabletFlatExecutor::TTransactionContext& txc) {
+bool TBackupTransactionOperator::ExecuteOnAbort(TColumnShard& owner, NTabletFlatExecutor::TTransactionContext& txc) {
owner.GetExportsManager()->RemoveSession(ExportTask->GetIdentifier(), txc);
return true;
}
diff --git a/ydb/core/tx/columnshard/transactions/operators/backup.h b/ydb/core/tx/columnshard/transactions/operators/backup.h
index bc873e3dbda..1e9612f6309 100644
--- a/ydb/core/tx/columnshard/transactions/operators/backup.h
+++ b/ydb/core/tx/columnshard/transactions/operators/backup.h
@@ -6,24 +6,30 @@
namespace NKikimr::NColumnShard {
- class TBackupTransactionOperator : public TTxController::ITransactionOperatior {
+ class TBackupTransactionOperator : public TTxController::ITransactionOperator {
private:
std::shared_ptr<NOlap::NExport::TExportTask> ExportTask;
- using TBase = TTxController::ITransactionOperatior;
+ using TBase = TTxController::ITransactionOperator;
using TProposeResult = TTxController::TProposeResult;
static inline auto Registrator = TFactory::TRegistrator<TBackupTransactionOperator>(NKikimrTxColumnShard::TX_KIND_BACKUP);
public:
using TBase::TBase;
- virtual bool Parse(const TString& data) override;
+ virtual bool Parse(TColumnShard& owner, const TString& data) override;
- virtual TProposeResult Propose(TColumnShard& owner, NTabletFlatExecutor::TTransactionContext& txc, bool /*proposed*/) const override;
+ virtual TProposeResult ExecuteOnPropose(TColumnShard& owner, NTabletFlatExecutor::TTransactionContext& txc) const override;
+ virtual bool CompleteOnPropose(TColumnShard& /*owner*/, const TActorContext& /*ctx*/) const override {
+ return true;
+ }
- virtual bool Progress(TColumnShard& owner, const NOlap::TSnapshot& version, NTabletFlatExecutor::TTransactionContext& txc) override;
+ virtual bool ExecuteOnProgress(TColumnShard& owner, const NOlap::TSnapshot& version, NTabletFlatExecutor::TTransactionContext& txc) override;
- virtual bool Complete(TColumnShard& owner, const TActorContext& ctx) override;
+ virtual bool CompleteOnProgress(TColumnShard& owner, const TActorContext& ctx) override;
- virtual bool Abort(TColumnShard& owner, NTabletFlatExecutor::TTransactionContext& txc) override;
+ virtual bool ExecuteOnAbort(TColumnShard& owner, NTabletFlatExecutor::TTransactionContext& txc) override;
+ virtual bool CompleteOnAbort(TColumnShard& /*owner*/, const TActorContext& /*ctx*/) override {
+ return true;
+ }
};
}
diff --git a/ydb/core/tx/columnshard/transactions/operators/ev_write.h b/ydb/core/tx/columnshard/transactions/operators/ev_write.h
index ca83ce8696a..81e2ad35d8a 100644
--- a/ydb/core/tx/columnshard/transactions/operators/ev_write.h
+++ b/ydb/core/tx/columnshard/transactions/operators/ev_write.h
@@ -4,14 +4,14 @@
namespace NKikimr::NColumnShard {
- class TEvWriteTransactionOperator : public TTxController::ITransactionOperatior {
- using TBase = TTxController::ITransactionOperatior;
+ class TEvWriteTransactionOperator : public TTxController::ITransactionOperator {
+ using TBase = TTxController::ITransactionOperator;
using TProposeResult = TTxController::TProposeResult;
static inline auto Registrator = TFactory::TRegistrator<TEvWriteTransactionOperator>(NKikimrTxColumnShard::TX_KIND_COMMIT_WRITE);
public:
using TBase::TBase;
- virtual bool Parse(const TString& data) override {
+ virtual bool Parse(TColumnShard& /*owner*/, const TString& data) override {
NKikimrTxColumnShard::TCommitWriteTxBody commitTxBody;
if (!commitTxBody.ParseFromString(data)) {
return false;
@@ -20,24 +20,32 @@ namespace NKikimr::NColumnShard {
return !!LockId;
}
- TProposeResult Propose(TColumnShard& owner, NTabletFlatExecutor::TTransactionContext& txc, bool /*proposed*/) const override {
+ TProposeResult ExecuteOnPropose(TColumnShard& owner, NTabletFlatExecutor::TTransactionContext& txc) const override {
owner.OperationsManager->LinkTransaction(LockId, GetTxId(), txc);
return TProposeResult();
}
- virtual bool Progress(TColumnShard& owner, const NOlap::TSnapshot& version, NTabletFlatExecutor::TTransactionContext& txc) override {
+ bool CompleteOnPropose(TColumnShard& /*owner*/, const TActorContext& /*ctx*/) const override {
+ return true;
+ }
+
+ virtual bool ExecuteOnProgress(TColumnShard& owner, const NOlap::TSnapshot& version, NTabletFlatExecutor::TTransactionContext& txc) override {
return owner.OperationsManager->CommitTransaction(owner, GetTxId(), txc, version);
}
- virtual bool Complete(TColumnShard& owner, const TActorContext& ctx) override {
+ virtual bool CompleteOnProgress(TColumnShard& owner, const TActorContext& ctx) override {
auto result = NEvents::TDataEvents::TEvWriteResult::BuildCompleted(owner.TabletID(), GetTxId());
ctx.Send(TxInfo.Source, result.release(), 0, TxInfo.Cookie);
return true;
}
- virtual bool Abort(TColumnShard& owner, NTabletFlatExecutor::TTransactionContext& txc) override {
+ virtual bool ExecuteOnAbort(TColumnShard& owner, NTabletFlatExecutor::TTransactionContext& txc) override {
return owner.OperationsManager->AbortTransaction(owner, GetTxId(), txc);
}
+ virtual bool CompleteOnAbort(TColumnShard& /*owner*/, const TActorContext& /*ctx*/) override {
+ return true;
+ }
+
private:
ui64 LockId = 0;
};
diff --git a/ydb/core/tx/columnshard/transactions/operators/long_tx_write.h b/ydb/core/tx/columnshard/transactions/operators/long_tx_write.h
index 7ef86171f31..69622fc5031 100644
--- a/ydb/core/tx/columnshard/transactions/operators/long_tx_write.h
+++ b/ydb/core/tx/columnshard/transactions/operators/long_tx_write.h
@@ -4,14 +4,14 @@
namespace NKikimr::NColumnShard {
- class TLongTxTransactionOperator : public TTxController::ITransactionOperatior {
- using TBase = TTxController::ITransactionOperatior;
+ class TLongTxTransactionOperator : public TTxController::ITransactionOperator {
+ using TBase = TTxController::ITransactionOperator;
using TProposeResult = TTxController::TProposeResult;
static inline auto Registrator = TFactory::TRegistrator<TLongTxTransactionOperator>(NKikimrTxColumnShard::TX_KIND_COMMIT);
public:
using TBase::TBase;
- bool Parse(const TString& data) override {
+ bool Parse(TColumnShard& /*owner*/, const TString& data) override {
NKikimrTxColumnShard::TCommitTxBody commitTxBody;
if (!commitTxBody.ParseFromString(data)) {
return false;
@@ -31,7 +31,7 @@ namespace NKikimr::NColumnShard {
}
}
- TProposeResult Propose(TColumnShard& owner, NTabletFlatExecutor::TTransactionContext& /*txc*/, bool /*proposed*/) const override {
+ TProposeResult ExecuteOnPropose(TColumnShard& owner, NTabletFlatExecutor::TTransactionContext& /*txc*/) const override {
if (WriteIds.empty()) {
return TProposeResult(NKikimrTxColumnShard::EResultStatus::ERROR,
TStringBuilder() << "Commit TxId# " << GetTxId() << " has an empty list of write ids");
@@ -55,7 +55,11 @@ namespace NKikimr::NColumnShard {
return TProposeResult();;
}
- bool Progress(TColumnShard& owner, const NOlap::TSnapshot& version, NTabletFlatExecutor::TTransactionContext& txc) override {
+ bool CompleteOnPropose(TColumnShard& /*owner*/, const TActorContext& /*ctx*/) const override {
+ return true;
+ }
+
+ bool ExecuteOnProgress(TColumnShard& owner, const NOlap::TSnapshot& version, NTabletFlatExecutor::TTransactionContext& txc) override {
TBlobGroupSelector dsGroupSelector(owner.Info());
NOlap::TDbWrapper dbTable(txc.DB, &dsGroupSelector);
@@ -78,7 +82,7 @@ namespace NKikimr::NColumnShard {
return true;
}
- bool Complete(TColumnShard& owner, const TActorContext& ctx) override {
+ bool CompleteOnProgress(TColumnShard& owner, const TActorContext& ctx) override {
auto result = std::make_unique<TEvColumnShard::TEvProposeTransactionResult>(
owner.TabletID(), TxInfo.TxKind, GetTxId(), NKikimrTxColumnShard::SUCCESS);
result->Record.SetStep(TxInfo.PlanStep);
@@ -86,7 +90,7 @@ namespace NKikimr::NColumnShard {
return true;
}
- bool Abort(TColumnShard& owner, NTabletFlatExecutor::TTransactionContext& txc) override {
+ virtual bool ExecuteOnAbort(TColumnShard& owner, NTabletFlatExecutor::TTransactionContext& txc) override {
NIceDb::TNiceDb db(txc.DB);
for (TWriteId writeId : WriteIds) {
owner.RemoveLongTxWrite(db, writeId, GetTxId());
@@ -96,6 +100,9 @@ namespace NKikimr::NColumnShard {
owner.InsertTable->Abort(dbTable, WriteIds);
return true;
}
+ virtual bool CompleteOnAbort(TColumnShard& /*owner*/, const TActorContext& /*ctx*/) override {
+ return true;
+ }
private:
THashSet<TWriteId> WriteIds;
diff --git a/ydb/core/tx/columnshard/transactions/operators/schema.h b/ydb/core/tx/columnshard/transactions/operators/schema.h
index deb2582c7b3..d5907e4574a 100644
--- a/ydb/core/tx/columnshard/transactions/operators/schema.h
+++ b/ydb/core/tx/columnshard/transactions/operators/schema.h
@@ -4,14 +4,14 @@
namespace NKikimr::NColumnShard {
- class TSchemaTransactionOperator : public TTxController::ITransactionOperatior {
- using TBase = TTxController::ITransactionOperatior;
+ class TSchemaTransactionOperator : public TTxController::ITransactionOperator {
+ using TBase = TTxController::ITransactionOperator;
using TProposeResult = TTxController::TProposeResult;
static inline auto Registrator = TFactory::TRegistrator<TSchemaTransactionOperator>(NKikimrTxColumnShard::TX_KIND_SCHEMA);
public:
using TBase::TBase;
- virtual bool Parse(const TString& data) override {
+ virtual bool Parse(TColumnShard& /*owner*/, const TString& data) override {
if (!SchemaTxBody.ParseFromString(data)) {
return false;
}
@@ -22,7 +22,7 @@ namespace NKikimr::NColumnShard {
return false;
}
- TProposeResult Propose(TColumnShard& owner, NTabletFlatExecutor::TTransactionContext& txc, bool /*proposed*/) const override {
+ TProposeResult ExecuteOnPropose(TColumnShard& owner, NTabletFlatExecutor::TTransactionContext& txc) const override {
switch (SchemaTxBody.TxBody_case()) {
case NKikimrTxColumnShard::TSchemaTxBody::kInitShard:
{
@@ -66,13 +66,17 @@ namespace NKikimr::NColumnShard {
return TProposeResult();
}
- virtual bool Progress(TColumnShard& owner, const NOlap::TSnapshot& version, NTabletFlatExecutor::TTransactionContext& txc) override {
+ virtual bool CompleteOnPropose(TColumnShard& /*owner*/, const TActorContext& /*ctx*/) const override {
+ return true;
+ }
+
+ virtual bool ExecuteOnProgress(TColumnShard& owner, const NOlap::TSnapshot& version, NTabletFlatExecutor::TTransactionContext& txc) override {
owner.RunSchemaTx(SchemaTxBody, version, txc);
owner.ProtectSchemaSeqNo(SchemaTxBody.GetSeqNo(), txc);
return true;
}
- virtual bool Complete(TColumnShard& owner, const TActorContext& ctx) override {
+ virtual bool CompleteOnProgress(TColumnShard& owner, const TActorContext& ctx) override {
for (TActorId subscriber : NotifySubscribers) {
auto event = MakeHolder<TEvColumnShard::TEvNotifyTxCompletionResult>(owner.TabletID(), GetTxId());
ctx.Send(subscriber, event.Release(), 0, 0);
@@ -85,8 +89,10 @@ namespace NKikimr::NColumnShard {
return true;
}
- virtual bool Abort(TColumnShard& owner, NTabletFlatExecutor::TTransactionContext& txc) override {
- Y_UNUSED(owner, txc);
+ virtual bool ExecuteOnAbort(TColumnShard& /*owner*/, NTabletFlatExecutor::TTransactionContext& /*txc*/) override {
+ return true;
+ }
+ virtual bool CompleteOnAbort(TColumnShard& /*owner*/, const TActorContext& /*ctx*/) override {
return true;
}
diff --git a/ydb/core/tx/columnshard/transactions/propose_transaction_base.cpp b/ydb/core/tx/columnshard/transactions/propose_transaction_base.cpp
index ef50e2ae595..7f6c1911fda 100644
--- a/ydb/core/tx/columnshard/transactions/propose_transaction_base.cpp
+++ b/ydb/core/tx/columnshard/transactions/propose_transaction_base.cpp
@@ -6,8 +6,8 @@
namespace NKikimr::NColumnShard {
void TProposeTransactionBase::ProposeTransaction(const TTxController::TBasicTxInfo& txInfo, const TString& txBody, const TActorId source, const ui64 cookie, TTransactionContext& txc) {
- auto txOperator = TTxController::ITransactionOperatior::TFactory::MakeHolder(txInfo.TxKind, TTxController::TTxInfo(txInfo.TxKind, txInfo.TxId));
- if (!txOperator || !txOperator->Parse(txBody)) {
+ auto txOperator = TTxController::ITransactionOperator::TFactory::MakeHolder(txInfo.TxKind, TTxController::TTxInfo(txInfo.TxKind, txInfo.TxId));
+ if (!txOperator || !txOperator->Parse(*Self, txBody)) {
TTxController::TProposeResult proposeResult(NKikimrTxColumnShard::EResultStatus::ERROR, TStringBuilder() << "Error processing commit TxId# " << txInfo.TxId
<< (txOperator ? ". Parsing error " : ". Unknown operator for txKind"));
OnProposeError(proposeResult, txInfo);
@@ -23,7 +23,7 @@ namespace NKikimr::NColumnShard {
TTxController::TProposeResult proposeResult;
OnProposeResult(proposeResult, *txInfoPtr);
} else {
- auto proposeResult = txOperator->Propose(*Self, txc, false);
+ auto proposeResult = txOperator->ExecuteOnPropose(*Self, txc);
if (!!proposeResult) {
const auto fullTxInfo = txOperator->TxWithDeadline() ? Self->GetProgressTxController().RegisterTxWithDeadline(txInfo.TxId, txInfo.TxKind, txBody, source, cookie, txc)
: Self->GetProgressTxController().RegisterTx(txInfo.TxId, txInfo.TxKind, txBody, source, cookie, txc);
@@ -34,4 +34,14 @@ namespace NKikimr::NColumnShard {
}
}
}
+
+ void TProposeTransactionBase::CompleteTransaction(const ui64 txId, const TActorContext& ctx) {
+ auto txOperator = Self->GetProgressTxController().GetTxOperator(txId);
+ if (!txOperator) {
+ AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("error", "cannot found txOperator in propose transaction base")("tx_id", txId);
+ } else {
+ txOperator->CompleteOnPropose(*Self, ctx);
+ }
+ }
+
}
diff --git a/ydb/core/tx/columnshard/transactions/propose_transaction_base.h b/ydb/core/tx/columnshard/transactions/propose_transaction_base.h
index 7657c4aff47..7502fb677ec 100644
--- a/ydb/core/tx/columnshard/transactions/propose_transaction_base.h
+++ b/ydb/core/tx/columnshard/transactions/propose_transaction_base.h
@@ -13,6 +13,7 @@ public:
protected:
void ProposeTransaction(const TTxController::TBasicTxInfo& txInfo, const TString& txBody, const TActorId source, const ui64 cookie, TTransactionContext& txc);
+ void CompleteTransaction(const ui64 txId, const TActorContext& ctx);
virtual void OnProposeResult(TTxController::TProposeResult& proposeResult, const TTxController::TTxInfo& txInfo) = 0;
virtual void OnProposeError(TTxController::TProposeResult& proposeResult, const TTxController::TBasicTxInfo& txInfo) = 0;
diff --git a/ydb/core/tx/columnshard/transactions/tx_controller.cpp b/ydb/core/tx/columnshard/transactions/tx_controller.cpp
index 43a56ecf348..087e79394e3 100644
--- a/ydb/core/tx/columnshard/transactions/tx_controller.cpp
+++ b/ydb/core/tx/columnshard/transactions/tx_controller.cpp
@@ -65,9 +65,9 @@ bool TTxController::Load(NTabletFlatExecutor::TTransactionContext& txc) {
}
const TString txBody = rowset.GetValue<Schema::TxInfo::TxBody>();
- ITransactionOperatior::TPtr txOperator(ITransactionOperatior::TFactory::Construct(txInfo.TxKind, txInfo));
+ ITransactionOperator::TPtr txOperator(ITransactionOperator::TFactory::Construct(txInfo.TxKind, txInfo));
Y_ABORT_UNLESS(!!txOperator);
- Y_ABORT_UNLESS(txOperator->Parse(txBody));
+ Y_ABORT_UNLESS(txOperator->Parse(Owner, txBody));
Operators[txId] = txOperator;
if (!rowset.Next()) {
@@ -77,7 +77,7 @@ bool TTxController::Load(NTabletFlatExecutor::TTransactionContext& txc) {
return true;
}
-TTxController::ITransactionOperatior::TPtr TTxController::GetTxOperator(const ui64 txId) {
+TTxController::ITransactionOperator::TPtr TTxController::GetTxOperator(const ui64 txId) {
auto it = Operators.find(txId);
if(it == Operators.end()) {
return nullptr;
@@ -85,7 +85,7 @@ TTxController::ITransactionOperatior::TPtr TTxController::GetTxOperator(const ui
return it->second;
}
-TTxController::ITransactionOperatior::TPtr TTxController::GetVerifiedTxOperator(const ui64 txId) {
+TTxController::ITransactionOperator::TPtr TTxController::GetVerifiedTxOperator(const ui64 txId) {
auto it = Operators.find(txId);
AFL_VERIFY(it != Operators.end())("tx_id", txId);
return it->second;
@@ -99,9 +99,9 @@ TTxController::TTxInfo TTxController::RegisterTx(const ui64 txId, const NKikimrT
txInfo.Source = source;
txInfo.Cookie = cookie;
- ITransactionOperatior::TPtr txOperator(ITransactionOperatior::TFactory::Construct(txInfo.TxKind, txInfo));
+ ITransactionOperator::TPtr txOperator(ITransactionOperator::TFactory::Construct(txInfo.TxKind, txInfo));
Y_ABORT_UNLESS(!!txOperator);
- Y_ABORT_UNLESS(txOperator->Parse(txBody));
+ Y_ABORT_UNLESS(txOperator->Parse(Owner, txBody));
Operators[txId] = txOperator;
Schema::SaveTxInfo(db, txInfo.TxId, txInfo.TxKind, txBody, Max<ui64>(), txInfo.Source, txInfo.Cookie);
@@ -118,9 +118,9 @@ TTxController::TTxInfo TTxController::RegisterTxWithDeadline(const ui64 txId, co
txInfo.MinStep = GetAllowedStep();
txInfo.MaxStep = txInfo.MinStep + MaxCommitTxDelay.MilliSeconds();
- ITransactionOperatior::TPtr txOperator(ITransactionOperatior::TFactory::Construct(txInfo.TxKind, txInfo));
+ ITransactionOperator::TPtr txOperator(ITransactionOperator::TFactory::Construct(txInfo.TxKind, txInfo));
Y_ABORT_UNLESS(!!txOperator);
- Y_ABORT_UNLESS(txOperator->Parse(txBody));
+ Y_ABORT_UNLESS(txOperator->Parse(Owner, txBody));
Operators[txId] = txOperator;
Schema::SaveTxInfo(db, txInfo.TxId, txInfo.TxKind, txBody, txInfo.MaxStep, txInfo.Source, txInfo.Cookie);
@@ -137,7 +137,8 @@ bool TTxController::AbortTx(const ui64 txId, NTabletFlatExecutor::TTransactionCo
auto opIt = Operators.find(txId);
Y_ABORT_UNLESS(opIt != Operators.end());
- opIt->second->Abort(Owner, txc);
+ opIt->second->ExecuteOnAbort(Owner, txc);
+ opIt->second->CompleteOnAbort(Owner, NActors::TActivationContext::AsActorContext());
if (it->second.MaxStep != Max<ui64>()) {
DeadlineQueue.erase(TPlanQueueItem(it->second.MaxStep, txId));
@@ -149,25 +150,41 @@ bool TTxController::AbortTx(const ui64 txId, NTabletFlatExecutor::TTransactionCo
return true;
}
-bool TTxController::CancelTx(const ui64 txId, NTabletFlatExecutor::TTransactionContext& txc) {
+bool TTxController::CompleteOnCancel(const ui64 txId, const TActorContext& ctx) {
auto it = BasicTxInfo.find(txId);
if (it == BasicTxInfo.end()) {
return true;
}
if (it->second.PlanStep != 0) {
- // Cannot cancel planned transaction
return false;
}
auto opIt = Operators.find(txId);
Y_ABORT_UNLESS(opIt != Operators.end());
- opIt->second->Abort(Owner, txc);
+ opIt->second->CompleteOnAbort(Owner, ctx);
if (it->second.MaxStep != Max<ui64>()) {
DeadlineQueue.erase(TPlanQueueItem(it->second.MaxStep, txId));
}
BasicTxInfo.erase(it);
Operators.erase(txId);
+ return true;
+}
+
+bool TTxController::ExecuteOnCancel(const ui64 txId, NTabletFlatExecutor::TTransactionContext& txc) {
+ auto it = BasicTxInfo.find(txId);
+ if (it == BasicTxInfo.end()) {
+ return true;
+ }
+ if (it->second.PlanStep != 0) {
+ // Cannot cancel planned transaction
+ return false;
+ }
+
+ auto opIt = Operators.find(txId);
+ Y_ABORT_UNLESS(opIt != Operators.end());
+ opIt->second->ExecuteOnAbort(Owner, txc);
+
NIceDb::TNiceDb db(txc.DB);
Schema::EraseTxInfo(db, txId);
return true;
@@ -188,13 +205,12 @@ std::optional<TTxController::TTxInfo> TTxController::StartPlannedTx() {
void TTxController::FinishPlannedTx(const ui64 txId, NTabletFlatExecutor::TTransactionContext& txc) {
NIceDb::TNiceDb db(txc.DB);
-
- BasicTxInfo.erase(txId);
- Operators.erase(txId);
Schema::EraseTxInfo(db, txId);
}
void TTxController::CompleteRunningTx(const TPlanQueueItem& txItem) {
+ AFL_VERIFY(BasicTxInfo.erase(txItem.TxId));
+ AFL_VERIFY(Operators.erase(txItem.TxId));
AFL_VERIFY(RunningQueue.erase(txItem))("info", txItem.DebugString());
}
diff --git a/ydb/core/tx/columnshard/transactions/tx_controller.h b/ydb/core/tx/columnshard/transactions/tx_controller.h
index d48bd504a1f..5a54471c87c 100644
--- a/ydb/core/tx/columnshard/transactions/tx_controller.h
+++ b/ydb/core/tx/columnshard/transactions/tx_controller.h
@@ -71,14 +71,14 @@ public:
}
};
- class ITransactionOperatior {
+ class ITransactionOperator {
protected:
TTxInfo TxInfo;
public:
- using TPtr = std::shared_ptr<ITransactionOperatior>;
- using TFactory = NObjectFactory::TParametrizedObjectFactory<ITransactionOperatior, NKikimrTxColumnShard::ETransactionKind, TTxInfo>;
+ using TPtr = std::shared_ptr<ITransactionOperator>;
+ using TFactory = NObjectFactory::TParametrizedObjectFactory<ITransactionOperator, NKikimrTxColumnShard::ETransactionKind, TTxInfo>;
- ITransactionOperatior(const TTxInfo& txInfo)
+ ITransactionOperator(const TTxInfo& txInfo)
: TxInfo(txInfo)
{}
@@ -86,18 +86,22 @@ public:
return TxInfo.TxId;
}
- virtual ~ITransactionOperatior() {}
+ virtual ~ITransactionOperator() {}
virtual bool TxWithDeadline() const {
return true;
}
- virtual bool Parse(const TString& data) = 0;
- virtual TProposeResult Propose(TColumnShard& owner, NTabletFlatExecutor::TTransactionContext& txc, bool proposed) const = 0;
+ virtual bool Parse(TColumnShard& owner, const TString& data) = 0;
+ virtual TProposeResult ExecuteOnPropose(TColumnShard& owner, NTabletFlatExecutor::TTransactionContext& txc) const = 0;
+ virtual bool CompleteOnPropose(TColumnShard& owner, const TActorContext& ctx) const = 0;
+
+ virtual bool ExecuteOnProgress(TColumnShard& owner, const NOlap::TSnapshot& version, NTabletFlatExecutor::TTransactionContext& txc) = 0;
+ virtual bool CompleteOnProgress(TColumnShard& owner, const TActorContext& ctx) = 0;
+
+ virtual bool ExecuteOnAbort(TColumnShard& owner, NTabletFlatExecutor::TTransactionContext& txc) = 0;
+ virtual bool CompleteOnAbort(TColumnShard& owner, const TActorContext& ctx) = 0;
- virtual bool Progress(TColumnShard& owner, const NOlap::TSnapshot& version, NTabletFlatExecutor::TTransactionContext& txc) = 0;
- virtual bool Abort(TColumnShard& owner, NTabletFlatExecutor::TTransactionContext& txc) = 0;
- virtual bool Complete(TColumnShard& owner, const TActorContext& ctx) = 0;
virtual void RegisterSubscriber(const TActorId&) {
AFL_VERIFY(false)("message", "Not implemented");
};
@@ -112,7 +116,7 @@ private:
std::set<TPlanQueueItem> PlanQueue;
std::set<TPlanQueueItem> RunningQueue;
- THashMap<ui64, ITransactionOperatior::TPtr> Operators;
+ THashMap<ui64, ITransactionOperator::TPtr> Operators;
private:
ui64 GetAllowedStep() const;
@@ -121,8 +125,8 @@ private:
public:
TTxController(TColumnShard& owner);
- ITransactionOperatior::TPtr GetTxOperator(const ui64 txId);
- ITransactionOperatior::TPtr GetVerifiedTxOperator(const ui64 txId);
+ ITransactionOperator::TPtr GetTxOperator(const ui64 txId);
+ ITransactionOperator::TPtr GetVerifiedTxOperator(const ui64 txId);
ui64 GetMemoryUsage() const;
bool HaveOutdatedTxs() const;
@@ -132,7 +136,8 @@ public:
TTxInfo RegisterTx(const ui64 txId, const NKikimrTxColumnShard::ETransactionKind& txKind, const TString& txBody, const TActorId& source, const ui64 cookie, NTabletFlatExecutor::TTransactionContext& txc);
TTxInfo RegisterTxWithDeadline(const ui64 txId, const NKikimrTxColumnShard::ETransactionKind& txKind, const TString& txBody, const TActorId& source, const ui64 cookie, NTabletFlatExecutor::TTransactionContext& txc);
- bool CancelTx(const ui64 txId, NTabletFlatExecutor::TTransactionContext& txc);
+ bool ExecuteOnCancel(const ui64 txId, NTabletFlatExecutor::TTransactionContext& txc);
+ bool CompleteOnCancel(const ui64 txId, const TActorContext& ctx);
std::optional<TTxInfo> StartPlannedTx();
void FinishPlannedTx(const ui64 txId, NTabletFlatExecutor::TTransactionContext& txc);
diff --git a/ydb/core/tx/columnshard/ut_rw/ut_backup.cpp b/ydb/core/tx/columnshard/ut_rw/ut_backup.cpp
index 377b9de19ce..a2f5b502d44 100644
--- a/ydb/core/tx/columnshard/ut_rw/ut_backup.cpp
+++ b/ydb/core/tx/columnshard/ut_rw/ut_backup.cpp
@@ -1,10 +1,12 @@
-#include "columnshard_ut_common.h"
+#include <ydb/core/tx/columnshard/test_helper/columnshard_ut_common.h>
#include <ydb/core/tx/columnshard/hooks/abstract/abstract.h>
#include <ydb/core/tx/columnshard/hooks/testing/controller.h>
#include <ydb/core/tx/columnshard/test_helper/controllers.h>
+#include <ydb/core/tx/columnshard/columnshard.h>
#include <ydb/core/tx/columnshard/operations/write_data.h>
+#include <ydb/core/tx/tx_processing.h>
#include <ydb/core/wrappers/fake_storage.h>
diff --git a/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp b/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp
index 65277b9f151..87516e83008 100644
--- a/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp
+++ b/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp
@@ -1,10 +1,11 @@
-#include "columnshard_ut_common.h"
+#include <ydb/core/tx/columnshard/test_helper/columnshard_ut_common.h>
#include <ydb/core/kqp/compute_actor/kqp_compute_events.h>
#include <ydb/core/base/blobstorage.h>
#include <util/string/printf.h>
#include <arrow/api.h>
#include <arrow/ipc/reader.h>
#include <ydb/library/yverify_stream/yverify_stream.h>
+#include <ydb/core/tx/columnshard/columnshard_impl.h>
#include <ydb/core/tx/columnshard/engines/changes/with_appended.h>
#include <ydb/core/tx/columnshard/engines/changes/compaction.h>
#include <ydb/core/tx/columnshard/engines/changes/cleanup_portions.h>
@@ -12,6 +13,7 @@
#include <ydb/core/tx/columnshard/hooks/abstract/abstract.h>
#include <ydb/core/tx/columnshard/hooks/testing/controller.h>
#include <ydb/core/tx/columnshard/engines/portions/portion_info.h>
+#include <ydb/core/tx/columnshard/test_helper/controllers.h>
#include <ydb/core/tx/columnshard/common/tests/shard_reader.h>
#include <ydb/library/actors/protos/unittests.pb.h>
#include <ydb/core/formats/arrow/simple_builder/filler.h>
@@ -2155,7 +2157,9 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) {
void TestCompactionSplitGranuleImpl(const TestTableDescription& table, const TTestBlobOptions& testBlobOptions = {}) {
TTestBasicRuntime runtime;
TTester::Setup(runtime);
- auto csDefaultControllerGuard = NKikimr::NYDBTest::TControllers::RegisterCSControllerGuard<TDefaultTestsController>();
+ runtime.SetLogPriority(NKikimrServices::TX_COLUMNSHARD_SCAN, NActors::NLog::PRI_NOTICE);
+ auto csDefaultControllerGuard = NKikimr::NYDBTest::TControllers::RegisterCSControllerGuard<NOlap::TWaitCompactionController>();
+ csDefaultControllerGuard->SetSmallSizeDetector(1LLU << 20);
TActorId sender = runtime.AllocateEdgeActor();
CreateTestBootstrapper(runtime, CreateTestTabletInfo(TTestTxConfig::TxTablet0, TTabletTypes::ColumnShard), &CreateColumnShard);
@@ -2273,7 +2277,9 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) {
RebootTablet(runtime, TTestTxConfig::TxTablet0, sender);
}
- { // Get index stats
+ const TInstant start = TInstant::Now();
+ bool success = false;
+ while (!success && TInstant::Now() - start < TDuration::Seconds(30)) { // Get index stats
ScanIndexStats(runtime, sender, {tableId, 42}, NOlap::TSnapshot(planStep, txId), 0);
auto scanInited = runtime.GrabEdgeEvent<NKqp::TEvKqpCompute::TEvScanInitActor>(handle);
auto& msg = scanInited->Record;
@@ -2283,7 +2289,6 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) {
ui64 sumCompactedRows = 0;
ui64 sumInsertedBytes = 0;
ui64 sumInsertedRows = 0;
- std::optional<ui32> keyColumnId;
while (true) {
ui32 resultLimit = 1024 * 1024;
runtime.Send(new IEventHandle(scanActorId, sender, new NKqp::TEvKqpCompute::TEvScanDataAck(resultLimit, 0, 1)));
@@ -2294,15 +2299,12 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) {
}
UNIT_ASSERT(scan->ArrowBatch);
auto batchStats = NArrow::ToBatch(scan->ArrowBatch, true);
- // Cerr << batchStats->ToString() << Endl;
-
for (ui32 i = 0; i < batchStats->num_rows(); ++i) {
auto paths = batchStats->GetColumnByName("PathId");
auto kinds = batchStats->GetColumnByName("Kind");
auto rows = batchStats->GetColumnByName("Rows");
- auto bytes = batchStats->GetColumnByName("BlobRangeSize");
- auto rawBytes = batchStats->GetColumnByName("RawBytes");
- auto internalColumnIds = batchStats->GetColumnByName("InternalEntityId");
+ auto bytes = batchStats->GetColumnByName("ColumnBlobBytes");
+ auto rawBytes = batchStats->GetColumnByName("ColumnRawBytes");
auto activities = batchStats->GetColumnByName("Activity");
AFL_VERIFY(activities);
@@ -2312,30 +2314,21 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) {
ui64 numRows = static_cast<arrow::UInt64Array&>(*rows).Value(i);
ui64 numBytes = static_cast<arrow::UInt64Array&>(*bytes).Value(i);
ui64 numRawBytes = static_cast<arrow::UInt64Array&>(*rawBytes).Value(i);
- ui32 internalColumnId = static_cast<arrow::UInt32Array&>(*internalColumnIds).Value(i);
bool activity = static_cast<arrow::BooleanArray&>(*activities).Value(i);
if (!activity) {
continue;
}
- if (!keyColumnId) {
- keyColumnId = internalColumnId;
- }
Cerr << "[" << __LINE__ << "] " << activity << " " << table.Pk[0].GetType().GetTypeId() << " "
<< pathId << " " << kindStr << " " << numRows << " " << numBytes << " " << numRawBytes << "\n";
if (pathId == tableId) {
- if (kindStr == ::ToString(NOlap::NPortion::EProduced::COMPACTED) || kindStr == ::ToString(NOlap::NPortion::EProduced::SPLIT_COMPACTED)) {
+ if (kindStr == ::ToString(NOlap::NPortion::EProduced::COMPACTED) || kindStr == ::ToString(NOlap::NPortion::EProduced::SPLIT_COMPACTED) || numBytes > (4LLU << 20)) {
sumCompactedBytes += numBytes;
- if (*keyColumnId == internalColumnId) {
- sumCompactedRows += numRows;
- }
+ sumCompactedRows += numRows;
//UNIT_ASSERT(numRawBytes > numBytes);
- }
- if (kindStr == ::ToString(NOlap::NPortion::EProduced::INSERTED)) {
+ } else if (kindStr == ::ToString(NOlap::NPortion::EProduced::INSERTED)) {
sumInsertedBytes += numBytes;
- if (*keyColumnId == internalColumnId) {
- sumInsertedRows += numRows;
- }
+ sumInsertedRows += numRows;
//UNIT_ASSERT(numRawBytes > numBytes);
}
} else {
@@ -2346,12 +2339,16 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) {
}
}
Cerr << "compacted=" << sumCompactedRows << ";inserted=" << sumInsertedRows << ";expected=" << fullNumRows << ";" << Endl;
- RebootTablet(runtime, TTestTxConfig::TxTablet0, sender);
- AFL_VERIFY(sumCompactedRows == fullNumRows)("sum", sumCompactedRows)("full", fullNumRows);
- UNIT_ASSERT(sumCompactedRows < sumCompactedBytes);
- UNIT_ASSERT(sumInsertedRows == 0);
- UNIT_ASSERT(sumInsertedBytes == 0);
+ if (!sumInsertedRows && sumCompactedRows == fullNumRows) {
+ success = true;
+ RebootTablet(runtime, TTestTxConfig::TxTablet0, sender);
+ UNIT_ASSERT(sumCompactedRows < sumCompactedBytes);
+ UNIT_ASSERT(sumInsertedBytes == 0);
+ } else {
+ Wakeup(runtime, sender, TTestTxConfig::TxTablet0);
+ }
}
+ AFL_VERIFY(success);
}
void TestCompactionSplitGranule(const TTypeId typeId) {
diff --git a/ydb/core/tx/columnshard/ut_rw/ut_normalizer.cpp b/ydb/core/tx/columnshard/ut_rw/ut_normalizer.cpp
index 29c748866ea..bda23a87d81 100644
--- a/ydb/core/tx/columnshard/ut_rw/ut_normalizer.cpp
+++ b/ydb/core/tx/columnshard/ut_rw/ut_normalizer.cpp
@@ -1,4 +1,5 @@
-#include "columnshard_ut_common.h"
+#include <ydb/core/tx/columnshard/columnshard_schema.h>
+#include <ydb/core/tx/columnshard/test_helper/columnshard_ut_common.h>
#include <ydb/core/tx/columnshard/hooks/abstract/abstract.h>
#include <ydb/core/tx/columnshard/hooks/testing/controller.h>
@@ -233,6 +234,11 @@ Y_UNIT_TEST_SUITE(Normalizers) {
{
auto readResult = ReadAllAsBatch(runtime, tableId, NOlap::TSnapshot(11, txId), schema);
UNIT_ASSERT_VALUES_EQUAL(readResult->num_rows(), 20048);
+ while (!csControllerGuard->GetInsertFinishedCounter().Val()) {
+ Cerr << csControllerGuard->GetInsertStartedCounter().Val() << Endl;
+ Wakeup(runtime, sender, TTestTxConfig::TxTablet0);
+ runtime.SimulateSleep(TDuration::Seconds(1));
+ }
}
RebootTablet(runtime, TTestTxConfig::TxTablet0, sender);
diff --git a/ydb/core/tx/columnshard/ut_rw/ya.make b/ydb/core/tx/columnshard/ut_rw/ya.make
index 8f5af7869b5..d03099069b7 100644
--- a/ydb/core/tx/columnshard/ut_rw/ya.make
+++ b/ydb/core/tx/columnshard/ut_rw/ya.make
@@ -32,7 +32,6 @@ PEERDIR(
YQL_LAST_ABI_VERSION()
SRCS(
- columnshard_ut_common.cpp
ut_columnshard_read_write.cpp
ut_normalizer.cpp
ut_backup.cpp
diff --git a/ydb/core/tx/columnshard/ut_schema/ut_columnshard_schema.cpp b/ydb/core/tx/columnshard/ut_schema/ut_columnshard_schema.cpp
index 7574f94793c..caa17144253 100644
--- a/ydb/core/tx/columnshard/ut_schema/ut_columnshard_schema.cpp
+++ b/ydb/core/tx/columnshard/ut_schema/ut_columnshard_schema.cpp
@@ -1,4 +1,4 @@
-#include "columnshard_ut_common.h"
+#include <ydb/core/tx/columnshard/test_helper/columnshard_ut_common.h>
#include <ydb/core/base/tablet.h>
#include <ydb/core/wrappers/s3_wrapper.h>
#include <ydb/services/metadata/service.h>
diff --git a/ydb/core/tx/columnshard/ut_schema/ya.make b/ydb/core/tx/columnshard/ut_schema/ya.make
index d3b7fdd5d84..35d906ee205 100644
--- a/ydb/core/tx/columnshard/ut_schema/ya.make
+++ b/ydb/core/tx/columnshard/ut_schema/ya.make
@@ -30,7 +30,6 @@ PEERDIR(
YQL_LAST_ABI_VERSION()
SRCS(
- columnshard_ut_common.cpp
ut_columnshard_schema.cpp
)
diff --git a/ydb/core/tx/schemeshard/ut_olap/ut_olap.cpp b/ydb/core/tx/schemeshard/ut_olap/ut_olap.cpp
index 31022e75558..c53fdeb5842 100644
--- a/ydb/core/tx/schemeshard/ut_olap/ut_olap.cpp
+++ b/ydb/core/tx/schemeshard/ut_olap/ut_olap.cpp
@@ -1,6 +1,6 @@
#include <ydb/core/tx/schemeshard/ut_helpers/helpers.h>
#include <ydb/core/tx/columnshard/columnshard.h>
-#include <ydb/core/tx/columnshard/columnshard_ut_common.h>
+#include <ydb/core/tx/columnshard/test_helper/columnshard_ut_common.h>
#include <ydb/core/formats/arrow/arrow_helpers.h>
#include <ydb/core/formats/arrow/arrow_batch_builder.h>
diff --git a/ydb/core/tx/schemeshard/ut_olap/ya.make b/ydb/core/tx/schemeshard/ut_olap/ya.make
index c502f450c32..1393af86188 100644
--- a/ydb/core/tx/schemeshard/ut_olap/ya.make
+++ b/ydb/core/tx/schemeshard/ut_olap/ya.make
@@ -30,7 +30,6 @@ YQL_LAST_ABI_VERSION()
SRCS(
ut_olap.cpp
- ydb/core/tx/columnshard/columnshard_ut_common.cpp
)
END()