diff options
author | Aleksandr Dmitriev <monster@ydb.tech> | 2024-09-24 15:41:57 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-09-24 15:41:57 +0300 |
commit | 63a6a817b08b2de862bead7ebd448e9d721c4a4f (patch) | |
tree | e8e7584f67e7bf99c9fe2394aacc153f60555dc7 | |
parent | b153534a8d20b215da2e542e4b6ef09f25a306f2 (diff) | |
download | ydb-63a6a817b08b2de862bead7ebd448e9d721c4a4f.tar.gz |
refactor statistics tests (#9654)
17 files changed, 317 insertions, 466 deletions
diff --git a/ydb/core/kqp/ut/query/kqp_analyze_ut.cpp b/ydb/core/kqp/ut/query/kqp_analyze_ut.cpp index e4708c2676..9342f75de4 100644 --- a/ydb/core/kqp/ut/query/kqp_analyze_ut.cpp +++ b/ydb/core/kqp/ut/query/kqp_analyze_ut.cpp @@ -19,7 +19,8 @@ Y_UNIT_TEST_SUITE(KqpAnalyze) { using namespace NStat; Y_UNIT_TEST_TWIN(AnalyzeTable, ColumnStore) { - TTestEnv env(1, 1, 1, true); + TTestEnv env(1, 1, true); + CreateDatabase(env, "Database"); TTableClient client(env.GetDriver()); diff --git a/ydb/core/statistics/aggregator/aggregator_impl.cpp b/ydb/core/statistics/aggregator/aggregator_impl.cpp index ef16ee2265..9becddab5a 100644 --- a/ydb/core/statistics/aggregator/aggregator_impl.cpp +++ b/ydb/core/statistics/aggregator/aggregator_impl.cpp @@ -511,7 +511,11 @@ void TStatisticsAggregator::InitializeStatisticsTable() { if (!EnableColumnStatistics) { return; } - Register(CreateStatisticsTableCreator(std::make_unique<TEvStatistics::TEvStatTableCreationResponse>())); + if (!Database) { + return; + } + Register(CreateStatisticsTableCreator( + std::make_unique<TEvStatistics::TEvStatTableCreationResponse>(), Database)); } void TStatisticsAggregator::Navigate() { @@ -598,7 +602,7 @@ void TStatisticsAggregator::SaveStatisticsToTable() { data.push_back(strSketch); } - Register(CreateSaveStatisticsQuery(SelfId(), + Register(CreateSaveStatisticsQuery(SelfId(), Database, TraversalPathId, EStatType::COUNT_MIN_SKETCH, std::move(columnTags), std::move(data))); } @@ -610,7 +614,7 @@ void TStatisticsAggregator::DeleteStatisticsFromTable() { PendingDeleteStatistics = false; - Register(CreateDeleteStatisticsQuery(SelfId(), TraversalPathId)); + Register(CreateDeleteStatisticsQuery(SelfId(), Database, TraversalPathId)); } void TStatisticsAggregator::ScheduleNextAnalyze(NIceDb::TNiceDb& db) { diff --git a/ydb/core/statistics/aggregator/tx_configure.cpp b/ydb/core/statistics/aggregator/tx_configure.cpp index 391324b0b0..4f0790b02e 100644 --- a/ydb/core/statistics/aggregator/tx_configure.cpp +++ b/ydb/core/statistics/aggregator/tx_configure.cpp @@ -22,8 +22,13 @@ struct TStatisticsAggregator::TTxConfigure : public TTxBase { NIceDb::TNiceDb db(txc.DB); + bool needInitialize = !Self->Database; Self->Database = Record.GetDatabase(); Self->PersistSysParam(db, Schema::SysParam_Database, Self->Database); + + if (needInitialize) { + Self->InitializeStatisticsTable(); + } return true; } diff --git a/ydb/core/statistics/aggregator/tx_init.cpp b/ydb/core/statistics/aggregator/tx_init.cpp index ae8a8a355e..fc53cf6cf7 100644 --- a/ydb/core/statistics/aggregator/tx_init.cpp +++ b/ydb/core/statistics/aggregator/tx_init.cpp @@ -287,7 +287,9 @@ struct TStatisticsAggregator::TTxInit : public TTxBase { SA_LOG_W("[" << Self->TabletID() << "] TTxInit::Complete. EnableColumnStatistics=false"); } - Self->InitializeStatisticsTable(); + if (Self->Database) { + Self->InitializeStatisticsTable(); + } if (Self->TraversalPathId && Self->TraversalStartKey) { SA_LOG_D("[" << Self->TabletID() << "] TTxInit::Complete. Start navigate. PathId " << Self->TraversalPathId); diff --git a/ydb/core/statistics/aggregator/ut/ut_analyze_columnshard.cpp b/ydb/core/statistics/aggregator/ut/ut_analyze_columnshard.cpp index d1c9f32ee4..37790542c8 100644 --- a/ydb/core/statistics/aggregator/ut/ut_analyze_columnshard.cpp +++ b/ydb/core/statistics/aggregator/ut/ut_analyze_columnshard.cpp @@ -9,8 +9,6 @@ namespace NKikimr { namespace NStat { - - Y_UNIT_TEST_SUITE(AnalyzeColumnshard) { Y_UNIT_TEST(AnalyzeTable) { TTestEnv env(1, 1); diff --git a/ydb/core/statistics/aggregator/ut/ut_analyze_datashard.cpp b/ydb/core/statistics/aggregator/ut/ut_analyze_datashard.cpp index 4c1bf7a7d4..3d4c8e9fcd 100644 --- a/ydb/core/statistics/aggregator/ut/ut_analyze_datashard.cpp +++ b/ydb/core/statistics/aggregator/ut/ut_analyze_datashard.cpp @@ -7,35 +7,22 @@ #include <ydb/core/statistics/events.h> #include <ydb/core/statistics/service/service.h> -#include <thread> - namespace NKikimr { namespace NStat { -namespace { - - -} // namespace - Y_UNIT_TEST_SUITE(AnalyzeDatashard) { Y_UNIT_TEST(AnalyzeOneTable) { TTestEnv env(1, 1); - auto init = [&] () { - CreateDatabase(env, "Database"); - CreateUniformTable(env, "Database", "Table"); - }; - std::thread initThread(init); auto& runtime = *env.GetServer().GetRuntime(); - runtime.SimulateSleep(TDuration::Seconds(5)); - initThread.join(); + + CreateDatabase(env, "Database"); + CreateUniformTable(env, "Database", "Table"); ui64 saTabletId; auto pathId = ResolvePathId(runtime, "/Root/Database/Table", nullptr, &saTabletId); - runtime.SimulateSleep(TDuration::Seconds(30)); - Analyze(runtime, saTabletId, {{pathId}}); ValidateCountMinDatashardAbsense(runtime, pathId); @@ -43,20 +30,12 @@ Y_UNIT_TEST_SUITE(AnalyzeDatashard) { Y_UNIT_TEST(AnalyzeTwoTables) { TTestEnv env(1, 1); - auto init = [&] () { - CreateDatabase(env, "Database"); - CreateUniformTable(env, "Database", "Table1"); - CreateUniformTable(env, "Database", "Table2"); - }; - // TODO remove thread - std::thread initThread(init); auto& runtime = *env.GetServer().GetRuntime(); - runtime.SimulateSleep(TDuration::Seconds(5)); - initThread.join(); - // TODO remove sleep - runtime.SimulateSleep(TDuration::Seconds(30)); + CreateDatabase(env, "Database"); + CreateUniformTable(env, "Database", "Table1"); + CreateUniformTable(env, "Database", "Table2"); ui64 saTabletId1; auto pathId1 = ResolvePathId(runtime, "/Root/Database/Table1", nullptr, &saTabletId1); @@ -68,34 +47,21 @@ Y_UNIT_TEST_SUITE(AnalyzeDatashard) { ValidateCountMinDatashardAbsense(runtime, pathId2); } - Y_UNIT_TEST(DropTableNavigateError) { TTestEnv env(1, 1); - auto init = [&] () { - CreateDatabase(env, "Database"); - CreateUniformTable(env, "Database", "Table"); - }; - std::thread initThread(init); auto& runtime = *env.GetServer().GetRuntime(); - runtime.SimulateSleep(TDuration::Seconds(5)); - initThread.join(); + + CreateDatabase(env, "Database"); + CreateUniformTable(env, "Database", "Table"); ui64 saTabletId = 0; auto pathId = ResolvePathId(runtime, "/Root/Database/Table", nullptr, &saTabletId); - auto init2 = [&] () { - DropTable(env, "Database", "Table"); - }; - std::thread init2Thread(init2); - - runtime.SimulateSleep(TDuration::Seconds(5)); - init2Thread.join(); + DropTable(env, "Database", "Table"); Analyze(runtime, saTabletId, {pathId}); - runtime.SimulateSleep(TDuration::Seconds(10)); - ValidateCountMinDatashardAbsense(runtime, pathId); } } diff --git a/ydb/core/statistics/aggregator/ut/ut_traverse_columnshard.cpp b/ydb/core/statistics/aggregator/ut/ut_traverse_columnshard.cpp index efd8c8015f..f9f46f5d5c 100644 --- a/ydb/core/statistics/aggregator/ut/ut_traverse_columnshard.cpp +++ b/ydb/core/statistics/aggregator/ut/ut_traverse_columnshard.cpp @@ -43,6 +43,18 @@ Y_UNIT_TEST_SUITE(TraverseColumnShard) { UNIT_ASSERT(CheckCountMinSketch(countMin, ColumnTableRowsNumber)); } + Y_UNIT_TEST(TraverseServerlessColumnTable) { + TTestEnv env(1, 1); + auto& runtime = *env.GetServer().GetRuntime(); + auto tableInfo = CreateServerlessDatabaseColumnTables(env, 1, 10)[0]; + + WaitForSavedStatistics(runtime, tableInfo.PathId); + + auto countMin = ExtractCountMin(runtime, tableInfo.PathId); + + UNIT_ASSERT(CheckCountMinSketch(countMin, ColumnTableRowsNumber)); + } + Y_UNIT_TEST(TraverseColumnTableRebootColumnshard) { TTestEnv env(1, 1); auto& runtime = *env.GetServer().GetRuntime(); diff --git a/ydb/core/statistics/aggregator/ut/ut_traverse_datashard.cpp b/ydb/core/statistics/aggregator/ut/ut_traverse_datashard.cpp index 37bba99f47..039334cf15 100644 --- a/ydb/core/statistics/aggregator/ut/ut_traverse_datashard.cpp +++ b/ydb/core/statistics/aggregator/ut/ut_traverse_datashard.cpp @@ -7,31 +7,17 @@ #include <ydb/core/statistics/events.h> #include <ydb/core/statistics/service/service.h> -#include <thread> - namespace NKikimr { namespace NStat { -namespace { - - -} // namespace - Y_UNIT_TEST_SUITE(TraverseDatashard) { Y_UNIT_TEST(TraverseOneTable) { TTestEnv env(1, 1); - auto init = [&] () { - CreateDatabase(env, "Database"); - CreateUniformTable(env, "Database", "Table"); - }; - std::thread initThread(init); - auto& runtime = *env.GetServer().GetRuntime(); - runtime.SimulateSleep(TDuration::Seconds(5)); - initThread.join(); - runtime.SimulateSleep(TDuration::Seconds(60)); + CreateDatabase(env, "Database"); + CreateUniformTable(env, "Database", "Table"); auto pathId = ResolvePathId(runtime, "/Root/Database/Table"); ValidateCountMinDatashardAbsense(runtime, pathId); @@ -39,18 +25,11 @@ Y_UNIT_TEST_SUITE(TraverseDatashard) { Y_UNIT_TEST(TraverseTwoTables) { TTestEnv env(1, 1); - auto init = [&] () { - CreateDatabase(env, "Database"); - CreateUniformTable(env, "Database", "Table1"); - CreateUniformTable(env, "Database", "Table2"); - }; - std::thread initThread(init); - auto& runtime = *env.GetServer().GetRuntime(); - runtime.SimulateSleep(TDuration::Seconds(5)); - initThread.join(); - runtime.SimulateSleep(TDuration::Seconds(60)); + CreateDatabase(env, "Database"); + CreateUniformTable(env, "Database", "Table1"); + CreateUniformTable(env, "Database", "Table2"); auto pathId1 = ResolvePathId(runtime, "/Root/Database/Table1"); auto pathId2 = ResolvePathId(runtime, "/Root/Database/Table2"); @@ -60,29 +39,11 @@ Y_UNIT_TEST_SUITE(TraverseDatashard) { Y_UNIT_TEST(TraverseOneTableServerless) { TTestEnv env(1, 1); - - auto init = [&] () { - CreateDatabase(env, "Shared"); - }; - std::thread initThread(init); - auto& runtime = *env.GetServer().GetRuntime(); - runtime.SimulateSleep(TDuration::Seconds(5)); - initThread.join(); - - TPathId domainKey; - ResolvePathId(runtime, "/Root/Shared", &domainKey); - - auto init2 = [&] () { - CreateServerlessDatabase(env, "Serverless", domainKey); - CreateUniformTable(env, "Serverless", "Table"); - }; - std::thread init2Thread(init2); - - runtime.SimulateSleep(TDuration::Seconds(5)); - init2Thread.join(); - runtime.SimulateSleep(TDuration::Seconds(60)); + CreateDatabase(env, "Shared", 1, true); + CreateServerlessDatabase(env, "Serverless", "/Root/Shared"); + CreateUniformTable(env, "Serverless", "Table"); auto pathId = ResolvePathId(runtime, "/Root/Serverless/Table"); ValidateCountMinDatashardAbsense(runtime, pathId); @@ -90,30 +51,12 @@ Y_UNIT_TEST_SUITE(TraverseDatashard) { Y_UNIT_TEST(TraverseTwoTablesServerless) { TTestEnv env(1, 1); - - auto init = [&] () { - CreateDatabase(env, "Shared"); - }; - std::thread initThread(init); - auto& runtime = *env.GetServer().GetRuntime(); - runtime.SimulateSleep(TDuration::Seconds(5)); - initThread.join(); - - TPathId domainKey; - ResolvePathId(runtime, "/Root/Shared", &domainKey); - - auto init2 = [&] () { - CreateServerlessDatabase(env, "Serverless", domainKey); - CreateUniformTable(env, "Serverless", "Table1"); - CreateUniformTable(env, "Serverless", "Table2"); - }; - std::thread init2Thread(init2); - - runtime.SimulateSleep(TDuration::Seconds(5)); - init2Thread.join(); - runtime.SimulateSleep(TDuration::Seconds(60)); + CreateDatabase(env, "Shared", 1, true); + CreateServerlessDatabase(env, "Serverless", "/Root/Shared"); + CreateUniformTable(env, "Serverless", "Table1"); + CreateUniformTable(env, "Serverless", "Table2"); auto pathId1 = ResolvePathId(runtime, "/Root/Serverless/Table1"); auto pathId2 = ResolvePathId(runtime, "/Root/Serverless/Table2"); @@ -123,31 +66,13 @@ Y_UNIT_TEST_SUITE(TraverseDatashard) { Y_UNIT_TEST(TraverseTwoTablesTwoServerlessDbs) { TTestEnv env(1, 1); - - auto init = [&] () { - CreateDatabase(env, "Shared"); - }; - std::thread initThread(init); - auto& runtime = *env.GetServer().GetRuntime(); - runtime.SimulateSleep(TDuration::Seconds(5)); - initThread.join(); - - TPathId domainKey; - ResolvePathId(runtime, "/Root/Shared", &domainKey); - - auto init2 = [&] () { - CreateServerlessDatabase(env, "Serverless1", domainKey); - CreateServerlessDatabase(env, "Serverless2", domainKey); - CreateUniformTable(env, "Serverless1", "Table1"); - CreateUniformTable(env, "Serverless2", "Table2"); - }; - std::thread init2Thread(init2); - - runtime.SimulateSleep(TDuration::Seconds(5)); - init2Thread.join(); - runtime.SimulateSleep(TDuration::Seconds(60)); + CreateDatabase(env, "Shared", 1, true); + CreateServerlessDatabase(env, "Serverless1", "/Root/Shared"); + CreateServerlessDatabase(env, "Serverless2", "/Root/Shared"); + CreateUniformTable(env, "Serverless1", "Table1"); + CreateUniformTable(env, "Serverless2", "Table2"); auto pathId1 = ResolvePathId(runtime, "/Root/Serverless1/Table1"); auto pathId2 = ResolvePathId(runtime, "/Root/Serverless2/Table2"); diff --git a/ydb/core/statistics/database/database.cpp b/ydb/core/statistics/database/database.cpp index 0a80147e24..41b67c5b03 100644 --- a/ydb/core/statistics/database/database.cpp +++ b/ydb/core/statistics/database/database.cpp @@ -10,8 +10,9 @@ namespace NKikimr::NStat { class TStatisticsTableCreator : public TActorBootstrapped<TStatisticsTableCreator> { public: - explicit TStatisticsTableCreator(std::unique_ptr<NActors::IEventBase> resultEvent) + explicit TStatisticsTableCreator(std::unique_ptr<NActors::IEventBase> resultEvent, const TString& database) : ResultEvent(std::move(resultEvent)) + , Database(database) {} void Registered(NActors::TActorSystem* sys, const NActors::TActorId& owner) override { @@ -38,6 +39,7 @@ public: { "owner_id", "local_path_id", "stat_type", "column_tag"}, NKikimrServices::STATISTICS, Nothing(), + Database, true, std::move(partitioningPolicy) ) @@ -67,11 +69,12 @@ private: private: std::unique_ptr<NActors::IEventBase> ResultEvent; + const TString Database; NActors::TActorId Owner; }; -NActors::IActor* CreateStatisticsTableCreator(std::unique_ptr<NActors::IEventBase> event) { - return new TStatisticsTableCreator(std::move(event)); +NActors::IActor* CreateStatisticsTableCreator(std::unique_ptr<NActors::IEventBase> event, const TString& database) { + return new TStatisticsTableCreator(std::move(event), database); } @@ -83,9 +86,9 @@ private: const std::vector<TString> Data; public: - TSaveStatisticsQuery(const TPathId& pathId, ui64 statType, + TSaveStatisticsQuery(const TString& database, const TPathId& pathId, ui64 statType, const std::vector<ui32>& columnTags, const std::vector<TString>& data) - : NKikimr::TQueryBase(NKikimrServices::STATISTICS, {}, {}, true) + : NKikimr::TQueryBase(NKikimrServices::STATISTICS, {}, database, true) , PathId(pathId) , StatType(statType) , ColumnTags(columnTags) @@ -160,6 +163,7 @@ public: class TSaveStatisticsRetryingQuery : public TActorBootstrapped<TSaveStatisticsRetryingQuery> { private: const NActors::TActorId ReplyActorId; + const TString Database; const TPathId PathId; const ui64 StatType; const std::vector<ui32> ColumnTags; @@ -168,11 +172,12 @@ private: public: using TSaveRetryingQuery = TQueryRetryActor< TSaveStatisticsQuery, TEvStatistics::TEvSaveStatisticsQueryResponse, - const TPathId&, ui64, const std::vector<ui32>&, const std::vector<TString>&>; + const TString&, const TPathId&, ui64, const std::vector<ui32>&, const std::vector<TString>&>; - TSaveStatisticsRetryingQuery(const NActors::TActorId& replyActorId, + TSaveStatisticsRetryingQuery(const NActors::TActorId& replyActorId, const TString& database, const TPathId& pathId, ui64 statType, std::vector<ui32>&& columnTags, std::vector<TString>&& data) : ReplyActorId(replyActorId) + , Database(database) , PathId(pathId) , StatType(statType) , ColumnTags(std::move(columnTags)) @@ -186,7 +191,7 @@ public: TSaveRetryingQuery::Retryable, TDuration::MilliSeconds(10), TDuration::MilliSeconds(200), TDuration::Seconds(1), std::numeric_limits<size_t>::max(), TDuration::Seconds(1)), - PathId, StatType, ColumnTags, Data + Database, PathId, StatType, ColumnTags, Data )); Become(&TSaveStatisticsRetryingQuery::StateFunc); } @@ -201,10 +206,10 @@ public: } }; -NActors::IActor* CreateSaveStatisticsQuery(const NActors::TActorId& replyActorId, +NActors::IActor* CreateSaveStatisticsQuery(const NActors::TActorId& replyActorId, const TString& database, const TPathId& pathId, ui64 statType, std::vector<ui32>&& columnTags, std::vector<TString>&& data) { - return new TSaveStatisticsRetryingQuery(replyActorId, pathId, statType, std::move(columnTags), std::move(data)); + return new TSaveStatisticsRetryingQuery(replyActorId, database, pathId, statType, std::move(columnTags), std::move(data)); } @@ -218,8 +223,8 @@ private: std::optional<TString> Data; public: - TLoadStatisticsQuery(const TPathId& pathId, ui64 statType, ui32 columnTag, ui64 cookie) - : NKikimr::TQueryBase(NKikimrServices::STATISTICS, {}, {}, true) + TLoadStatisticsQuery(const TString& database, const TPathId& pathId, ui64 statType, ui32 columnTag, ui64 cookie) + : NKikimr::TQueryBase(NKikimrServices::STATISTICS, {}, database, true) , PathId(pathId) , StatType(statType) , ColumnTag(columnTag) @@ -293,6 +298,7 @@ public: class TLoadStatisticsRetryingQuery : public TActorBootstrapped<TLoadStatisticsRetryingQuery> { private: const NActors::TActorId ReplyActorId; + const TString Database; const TPathId PathId; const ui64 StatType; const ui32 ColumnTag; @@ -301,11 +307,12 @@ private: public: using TLoadRetryingQuery = TQueryRetryActor< TLoadStatisticsQuery, TEvStatistics::TEvLoadStatisticsQueryResponse, - const TPathId&, ui64, ui32, ui64>; + const TString&, const TPathId&, ui64, ui32, ui64>; - TLoadStatisticsRetryingQuery(const NActors::TActorId& replyActorId, + TLoadStatisticsRetryingQuery(const NActors::TActorId& replyActorId, const TString& database, const TPathId& pathId, ui64 statType, ui32 columnTag, ui64 cookie) : ReplyActorId(replyActorId) + , Database(database) , PathId(pathId) , StatType(statType) , ColumnTag(columnTag) @@ -319,7 +326,7 @@ public: TLoadRetryingQuery::Retryable, TDuration::MilliSeconds(10), TDuration::MilliSeconds(200), TDuration::Seconds(1), std::numeric_limits<size_t>::max(), TDuration::Seconds(1)), - PathId, StatType, ColumnTag, Cookie + Database, PathId, StatType, ColumnTag, Cookie )); Become(&TLoadStatisticsRetryingQuery::StateFunc); } @@ -335,9 +342,9 @@ public: }; NActors::IActor* CreateLoadStatisticsQuery(const NActors::TActorId& replyActorId, - const TPathId& pathId, ui64 statType, ui32 columnTag, ui64 cookie) + const TString& database, const TPathId& pathId, ui64 statType, ui32 columnTag, ui64 cookie) { - return new TLoadStatisticsRetryingQuery(replyActorId, pathId, statType, columnTag, cookie); + return new TLoadStatisticsRetryingQuery(replyActorId, database, pathId, statType, columnTag, cookie); } @@ -346,8 +353,8 @@ private: const TPathId PathId; public: - TDeleteStatisticsQuery(const TPathId& pathId) - : NKikimr::TQueryBase(NKikimrServices::STATISTICS, {}, {}, true) + TDeleteStatisticsQuery(const TString& database, const TPathId& pathId) + : NKikimr::TQueryBase(NKikimrServices::STATISTICS, {}, database, true) , PathId(pathId) { } @@ -392,15 +399,18 @@ public: class TDeleteStatisticsRetryingQuery : public TActorBootstrapped<TDeleteStatisticsRetryingQuery> { private: const NActors::TActorId ReplyActorId; + const TString Database; const TPathId PathId; public: using TDeleteRetryingQuery = TQueryRetryActor< TDeleteStatisticsQuery, TEvStatistics::TEvDeleteStatisticsQueryResponse, - const TPathId&>; + const TString&, const TPathId&>; - TDeleteStatisticsRetryingQuery(const NActors::TActorId& replyActorId, const TPathId& pathId) + TDeleteStatisticsRetryingQuery(const NActors::TActorId& replyActorId, const TString& database, + const TPathId& pathId) : ReplyActorId(replyActorId) + , Database(database) , PathId(pathId) {} @@ -411,7 +421,7 @@ public: TDeleteRetryingQuery::Retryable, TDuration::MilliSeconds(10), TDuration::MilliSeconds(200), TDuration::Seconds(1), std::numeric_limits<size_t>::max(), TDuration::Seconds(1)), - PathId + Database, PathId )); Become(&TDeleteStatisticsRetryingQuery::StateFunc); } @@ -426,9 +436,10 @@ public: } }; -NActors::IActor* CreateDeleteStatisticsQuery(const NActors::TActorId& replyActorId, const TPathId& pathId) +NActors::IActor* CreateDeleteStatisticsQuery(const NActors::TActorId& replyActorId, const TString& database, + const TPathId& pathId) { - return new TDeleteStatisticsRetryingQuery(replyActorId, pathId); + return new TDeleteStatisticsRetryingQuery(replyActorId, database, pathId); } } // NKikimr::NStat diff --git a/ydb/core/statistics/database/database.h b/ydb/core/statistics/database/database.h index 4aef1c33cd..8f61d433dd 100644 --- a/ydb/core/statistics/database/database.h +++ b/ydb/core/statistics/database/database.h @@ -5,14 +5,15 @@ namespace NKikimr::NStat { -NActors::IActor* CreateStatisticsTableCreator(std::unique_ptr<NActors::IEventBase> event); +NActors::IActor* CreateStatisticsTableCreator(std::unique_ptr<NActors::IEventBase> event, const TString& database); -NActors::IActor* CreateSaveStatisticsQuery(const NActors::TActorId& replyActorId, +NActors::IActor* CreateSaveStatisticsQuery(const NActors::TActorId& replyActorId, const TString& database, const TPathId& pathId, ui64 statType, std::vector<ui32>&& columnTags, std::vector<TString>&& data); -NActors::IActor* CreateLoadStatisticsQuery(const NActors::TActorId& replyActorId, +NActors::IActor* CreateLoadStatisticsQuery(const NActors::TActorId& replyActorId, const TString& database, const TPathId& pathId, ui64 statType, ui32 columnTag, ui64 cookie); -NActors::IActor* CreateDeleteStatisticsQuery(const NActors::TActorId& replyActorId, const TPathId& pathId); +NActors::IActor* CreateDeleteStatisticsQuery(const NActors::TActorId& replyActorId, const TString& database, + const TPathId& pathId); }; diff --git a/ydb/core/statistics/database/ut/ut_database.cpp b/ydb/core/statistics/database/ut/ut_database.cpp index ff4c47a862..188f9e1c1e 100644 --- a/ydb/core/statistics/database/ut/ut_database.cpp +++ b/ydb/core/statistics/database/ut/ut_database.cpp @@ -12,41 +12,37 @@ namespace NKikimr::NStat { Y_UNIT_TEST_SUITE(StatisticsSaveLoad) { Y_UNIT_TEST(Simple) { TTestEnv env(1, 1); - auto init = [&] () { - CreateDatabase(env, "Database"); - }; - std::thread initThread(init); - auto& runtime = *env.GetServer().GetRuntime(); - runtime.SimulateSleep(TDuration::Seconds(5)); - initThread.join(); + + CreateDatabase(env, "Database"); auto sender = runtime.AllocateEdgeActor(0); - runtime.Register(CreateStatisticsTableCreator(std::make_unique<TEvStatistics::TEvStatTableCreationResponse>()), + runtime.Register(CreateStatisticsTableCreator( + std::make_unique<TEvStatistics::TEvStatTableCreationResponse>(), "/Root/Database"), 0, 0, TMailboxType::Simple, 0, sender); - runtime.GrabEdgeEvent<TEvStatistics::TEvStatTableCreationResponse>(sender); + runtime.GrabEdgeEventRethrow<TEvStatistics::TEvStatTableCreationResponse>(sender); TPathId pathId(1, 1); ui64 statType = 1; std::vector<ui32> columnTags = {1, 2}; std::vector<TString> data = {"dataA", "dataB"}; - runtime.Register(CreateSaveStatisticsQuery(sender, + runtime.Register(CreateSaveStatisticsQuery(sender, "/Root/Database", pathId, statType, std::move(columnTags), std::move(data)), 0, 0, TMailboxType::Simple, 0, sender); - auto saveResponse = runtime.GrabEdgeEvent<TEvStatistics::TEvSaveStatisticsQueryResponse>(sender); + auto saveResponse = runtime.GrabEdgeEventRethrow<TEvStatistics::TEvSaveStatisticsQueryResponse>(sender); UNIT_ASSERT(saveResponse->Get()->Success); - runtime.Register(CreateLoadStatisticsQuery(sender, pathId, statType, 1, 1), + runtime.Register(CreateLoadStatisticsQuery(sender, "/Root/Database", pathId, statType, 1, 1), 0, 0, TMailboxType::Simple, 0, sender); - auto loadResponseA = runtime.GrabEdgeEvent<TEvStatistics::TEvLoadStatisticsQueryResponse>(sender); + auto loadResponseA = runtime.GrabEdgeEventRethrow<TEvStatistics::TEvLoadStatisticsQueryResponse>(sender); UNIT_ASSERT(loadResponseA->Get()->Success); UNIT_ASSERT(loadResponseA->Get()->Data); UNIT_ASSERT_VALUES_EQUAL(*loadResponseA->Get()->Data, "dataA"); - runtime.Register(CreateLoadStatisticsQuery(sender, pathId, statType, 2, 1), + runtime.Register(CreateLoadStatisticsQuery(sender, "/Root/Database", pathId, statType, 2, 1), 0, 0, TMailboxType::Simple, 0, sender); - auto loadResponseB = runtime.GrabEdgeEvent<TEvStatistics::TEvLoadStatisticsQueryResponse>(sender); + auto loadResponseB = runtime.GrabEdgeEventRethrow<TEvStatistics::TEvLoadStatisticsQueryResponse>(sender); UNIT_ASSERT(loadResponseB->Get()->Success); UNIT_ASSERT(loadResponseB->Get()->Data); UNIT_ASSERT_VALUES_EQUAL(*loadResponseB->Get()->Data, "dataB"); @@ -54,17 +50,13 @@ Y_UNIT_TEST_SUITE(StatisticsSaveLoad) { Y_UNIT_TEST(Delete) { TTestEnv env(1, 1); - auto init = [&] () { - CreateDatabase(env, "Database"); - }; - std::thread initThread(init); - auto& runtime = *env.GetServer().GetRuntime(); - runtime.SimulateSleep(TDuration::Seconds(5)); - initThread.join(); + + CreateDatabase(env, "Database"); auto sender = runtime.AllocateEdgeActor(0); - runtime.Register(CreateStatisticsTableCreator(std::make_unique<TEvStatistics::TEvStatTableCreationResponse>()), + runtime.Register(CreateStatisticsTableCreator( + std::make_unique<TEvStatistics::TEvStatTableCreationResponse>(), "/Root/Database"), 0, 0, TMailboxType::Simple, 0, sender); runtime.GrabEdgeEvent<TEvStatistics::TEvStatTableCreationResponse>(sender); @@ -73,18 +65,18 @@ Y_UNIT_TEST_SUITE(StatisticsSaveLoad) { std::vector<ui32> columnTags = {1, 2}; std::vector<TString> data = {"dataA", "dataB"}; - runtime.Register(CreateSaveStatisticsQuery(sender, + runtime.Register(CreateSaveStatisticsQuery(sender, "/Root/Database", pathId, statType, std::move(columnTags), std::move(data)), 0, 0, TMailboxType::Simple, 0, sender); auto saveResponse = runtime.GrabEdgeEvent<TEvStatistics::TEvSaveStatisticsQueryResponse>(sender); UNIT_ASSERT(saveResponse->Get()->Success); - runtime.Register(CreateDeleteStatisticsQuery(sender, pathId), + runtime.Register(CreateDeleteStatisticsQuery(sender, "/Root/Database", pathId), 0, 0, TMailboxType::Simple, 0, sender); auto deleteResponse = runtime.GrabEdgeEvent<TEvStatistics::TEvDeleteStatisticsQueryResponse>(sender); UNIT_ASSERT(deleteResponse->Get()->Success); - runtime.Register(CreateLoadStatisticsQuery(sender, pathId, statType, 1, 1), + runtime.Register(CreateLoadStatisticsQuery(sender, "/Root/Database", pathId, statType, 1, 1), 0, 0, TMailboxType::Simple, 0, sender); auto loadResponseA = runtime.GrabEdgeEvent<TEvStatistics::TEvLoadStatisticsQueryResponse>(sender); UNIT_ASSERT(!loadResponseA->Get()->Success); @@ -92,15 +84,10 @@ Y_UNIT_TEST_SUITE(StatisticsSaveLoad) { Y_UNIT_TEST(ForbidAccess) { TTestEnv env(1, 1); - auto init = [&] () { - CreateDatabase(env, "Database"); - CreateUniformTable(env, "Database", "Table"); - }; - std::thread initThread(init); - auto& runtime = *env.GetServer().GetRuntime(); - runtime.SimulateSleep(TDuration::Seconds(10)); - initThread.join(); + + CreateDatabase(env, "Database", 1, true); + CreateUniformTable(env, "Database", "Table"); NYdb::EStatus status; auto test = [&] () { @@ -118,7 +105,7 @@ Y_UNIT_TEST_SUITE(StatisticsSaveLoad) { }; std::thread testThread(test); - runtime.SimulateSleep(TDuration::Seconds(10)); + runtime.SimulateSleep(TDuration::Seconds(1)); testThread.join(); UNIT_ASSERT_VALUES_EQUAL(status, NYdb::EStatus::SCHEME_ERROR); diff --git a/ydb/core/statistics/service/service_impl.cpp b/ydb/core/statistics/service/service_impl.cpp index ff6f6b8a79..f010494fee 100644 --- a/ydb/core/statistics/service/service_impl.cpp +++ b/ydb/core/statistics/service/service_impl.cpp @@ -685,7 +685,7 @@ private: } ui64 loadCookie = NextLoadQueryCookie++; LoadQueriesInFlight[loadCookie] = std::make_pair(requestId, reqIndex); - Register(CreateLoadStatisticsQuery(SelfId(), + Register(CreateLoadStatisticsQuery(SelfId(), "", req.PathId, request.StatType, *req.ColumnTag, loadCookie)); ++request.ReplyCounter; ++reqIndex; diff --git a/ydb/core/statistics/service/ut/ut_basic_statistics.cpp b/ydb/core/statistics/service/ut/ut_basic_statistics.cpp index 898e147cfa..310623fcaf 100644 --- a/ydb/core/statistics/service/ut/ut_basic_statistics.cpp +++ b/ydb/core/statistics/service/ut/ut_basic_statistics.cpp @@ -21,17 +21,13 @@ using namespace NYdb::NScheme; namespace { void CreateTable(TTestEnv& env, const TString& databaseName, const TString& tableName, size_t rowCount) { - TTableClient client(env.GetDriver()); - auto session = client.CreateSession().GetValueSync().GetSession(); - - auto result = session.ExecuteSchemeQuery(Sprintf(R"( + ExecuteYqlScript(env, Sprintf(R"( CREATE TABLE `Root/%s/%s` ( Key Uint64, Value Uint64, PRIMARY KEY (Key) ); - )", databaseName.c_str(), tableName.c_str())).GetValueSync(); - UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + )", databaseName.c_str(), tableName.c_str())); TStringBuilder replace; replace << Sprintf("REPLACE INTO `Root/%s/%s` (Key, Value) VALUES ", @@ -43,8 +39,7 @@ void CreateTable(TTestEnv& env, const TString& databaseName, const TString& tabl replace << Sprintf("(%uu, %uu)", i, i); } replace << ";"; - result = session.ExecuteDataQuery(replace, TTxControl::BeginTx().CommitTx()).GetValueSync(); - UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + ExecuteYqlScript(env, replace); } void ValidateRowCount(TTestActorRuntime& runtime, ui32 nodeIndex, TPathId pathId, size_t expectedRowCount) { @@ -76,7 +71,7 @@ void ValidateRowCount(TTestActorRuntime& runtime, ui32 nodeIndex, TPathId pathId break; } - runtime.SimulateSleep(TDuration::Seconds(5)); + runtime.SimulateSleep(TDuration::Seconds(1)); } } @@ -87,15 +82,10 @@ Y_UNIT_TEST_SUITE(BasicStatistics) { Y_UNIT_TEST(Simple) { TTestEnv env(1, 1); - auto init = [&] () { - CreateDatabase(env, "Database"); - CreateTable(env, "Database", "Table", 5); - }; - std::thread initThread(init); - auto& runtime = *env.GetServer().GetRuntime(); - runtime.SimulateSleep(TDuration::Seconds(5)); - initThread.join(); + + CreateDatabase(env, "Database"); + CreateTable(env, "Database", "Table", 5); auto pathId = ResolvePathId(runtime, "/Root/Database/Table"); ValidateRowCount(runtime, 1, pathId, 5); @@ -104,15 +94,10 @@ Y_UNIT_TEST_SUITE(BasicStatistics) { Y_UNIT_TEST(TwoNodes) { TTestEnv env(1, 2); - auto init = [&] () { - CreateDatabase(env, "Database", 2); - CreateTable(env, "Database", "Table", 5); - }; - std::thread initThread(init); - auto& runtime = *env.GetServer().GetRuntime(); - runtime.SimulateSleep(TDuration::Seconds(5)); - initThread.join(); + + CreateDatabase(env, "Database", 2); + CreateTable(env, "Database", "Table", 5); auto pathId1 = ResolvePathId(runtime, "/Root/Database/Table"); ValidateRowCount(runtime, 1, pathId1, 5); @@ -121,16 +106,12 @@ Y_UNIT_TEST_SUITE(BasicStatistics) { Y_UNIT_TEST(TwoTables) { TTestEnv env(1, 1); - auto init = [&] () { - CreateDatabase(env, "Database"); - CreateTable(env, "Database", "Table1", 5); - CreateTable(env, "Database", "Table2", 6); - }; - std::thread initThread(init); auto& runtime = *env.GetServer().GetRuntime(); - runtime.SimulateSleep(TDuration::Seconds(5)); - initThread.join(); + + CreateDatabase(env, "Database"); + CreateTable(env, "Database", "Table1", 5); + CreateTable(env, "Database", "Table2", 6); auto pathId1 = ResolvePathId(runtime, "/Root/Database/Table1"); auto pathId2 = ResolvePathId(runtime, "/Root/Database/Table2"); @@ -140,17 +121,13 @@ Y_UNIT_TEST_SUITE(BasicStatistics) { Y_UNIT_TEST(TwoDatabases) { TTestEnv env(1, 2); - auto init = [&] () { - CreateDatabase(env, "Database1"); - CreateDatabase(env, "Database2"); - CreateTable(env, "Database1", "Table1", 5); - CreateTable(env, "Database2", "Table2", 6); - }; - std::thread initThread(init); auto& runtime = *env.GetServer().GetRuntime(); - runtime.SimulateSleep(TDuration::Seconds(5)); - initThread.join(); + + CreateDatabase(env, "Database1", 1, false, "hdd1"); + CreateDatabase(env, "Database2", 1, false, "hdd2"); + CreateTable(env, "Database1", "Table1", 5); + CreateTable(env, "Database2", "Table2", 6); auto pathId1 = ResolvePathId(runtime, "/Root/Database1/Table1"); auto pathId2 = ResolvePathId(runtime, "/Root/Database2/Table2"); @@ -160,26 +137,12 @@ Y_UNIT_TEST_SUITE(BasicStatistics) { Y_UNIT_TEST(Serverless) { TTestEnv env(1, 1); - auto init = [&] () { - CreateDatabase(env, "Shared"); - }; - std::thread initThread(init); auto& runtime = *env.GetServer().GetRuntime(); - runtime.SimulateSleep(TDuration::Seconds(5)); - initThread.join(); - - TPathId domainKey; - ResolvePathId(runtime, "/Root/Shared", &domainKey); - - auto init2 = [&] () { - CreateServerlessDatabase(env, "Serverless", domainKey); - CreateTable(env, "Serverless", "Table", 5); - }; - std::thread init2Thread(init2); - runtime.SimulateSleep(TDuration::Seconds(5)); - init2Thread.join(); + CreateDatabase(env, "Shared", 1, true); + CreateServerlessDatabase(env, "Serverless", "/Root/Shared"); + CreateTable(env, "Serverless", "Table", 5); auto pathId = ResolvePathId(runtime, "/Root/Serverless/Table"); ValidateRowCount(runtime, 1, pathId, 5); @@ -187,28 +150,14 @@ Y_UNIT_TEST_SUITE(BasicStatistics) { Y_UNIT_TEST(TwoServerlessDbs) { TTestEnv env(1, 1); - auto init = [&] () { - CreateDatabase(env, "Shared"); - }; - std::thread initThread(init); auto& runtime = *env.GetServer().GetRuntime(); - runtime.SimulateSleep(TDuration::Seconds(5)); - initThread.join(); - TPathId domainKey; - ResolvePathId(runtime, "/Root/Shared", &domainKey); - - auto init2 = [&] () { - CreateServerlessDatabase(env, "Serverless1", domainKey); - CreateServerlessDatabase(env, "Serverless2", domainKey); - CreateTable(env, "Serverless1", "Table1", 5); - CreateTable(env, "Serverless2", "Table2", 6); - }; - std::thread init2Thread(init2); - - runtime.SimulateSleep(TDuration::Seconds(5)); - init2Thread.join(); + CreateDatabase(env, "Shared", 1, true); + CreateServerlessDatabase(env, "Serverless1", "/Root/Shared"); + CreateServerlessDatabase(env, "Serverless2", "/Root/Shared"); + CreateTable(env, "Serverless1", "Table1", 5); + CreateTable(env, "Serverless2", "Table2", 6); auto pathId1 = ResolvePathId(runtime, "/Root/Serverless1/Table1"); auto pathId2 = ResolvePathId(runtime, "/Root/Serverless2/Table2"); @@ -218,30 +167,15 @@ Y_UNIT_TEST_SUITE(BasicStatistics) { Y_UNIT_TEST(TwoServerlessTwoSharedDbs) { TTestEnv env(1, 2); - auto init = [&] () { - CreateDatabase(env, "Shared1"); - CreateDatabase(env, "Shared2"); - }; - std::thread initThread(init); auto& runtime = *env.GetServer().GetRuntime(); - runtime.SimulateSleep(TDuration::Seconds(5)); - initThread.join(); - - TPathId domainKey1, domainKey2; - ResolvePathId(runtime, "/Root/Shared1", &domainKey1); - ResolvePathId(runtime, "/Root/Shared2", &domainKey2); - - auto init2 = [&] () { - CreateServerlessDatabase(env, "Serverless1", domainKey1); - CreateServerlessDatabase(env, "Serverless2", domainKey2); - CreateTable(env, "Serverless1", "Table1", 5); - CreateTable(env, "Serverless2", "Table2", 6); - }; - std::thread init2Thread(init2); - - runtime.SimulateSleep(TDuration::Seconds(5)); - init2Thread.join(); + + CreateDatabase(env, "Shared1", 1, true, "hdd1"); + CreateDatabase(env, "Shared2", 1, true, "hdd2"); + CreateServerlessDatabase(env, "Serverless1", "/Root/Shared1"); + CreateServerlessDatabase(env, "Serverless2", "/Root/Shared2"); + CreateTable(env, "Serverless1", "Table1", 5); + CreateTable(env, "Serverless2", "Table2", 6); auto pathId1 = ResolvePathId(runtime, "/Root/Serverless1/Table1"); auto pathId2 = ResolvePathId(runtime, "/Root/Serverless2/Table2"); diff --git a/ydb/core/statistics/ut_common/ut_common.cpp b/ydb/core/statistics/ut_common/ut_common.cpp index 2737cb68a5..01d6ab7e1c 100644 --- a/ydb/core/statistics/ut_common/ut_common.cpp +++ b/ydb/core/statistics/ut_common/ut_common.cpp @@ -7,14 +7,14 @@ #include <ydb/core/tx/tx_proxy/proxy.h> #include <ydb/core/testlib/actors/wait_events.h> +#include <ydb/core/testlib/tenant_helpers.h> -// TODO remove SDK -#include <ydb/public/sdk/cpp/client/ydb_result/result.h> -#include <ydb/public/sdk/cpp/client/ydb_table/table.h> -#include <ydb/public/sdk/cpp/client/ydb_scheme/scheme.h> +#include <ydb/public/api/grpc/ydb_cms_v1.grpc.pb.h> +#include <ydb/public/api/grpc/ydb_operation_v1.grpc.pb.h> +#include <ydb/public/api/grpc/ydb_scripting_v1.grpc.pb.h> -// TODO remove thread -#include <thread> +#include <ydb/core/grpc_services/base/base.h> +#include <ydb/core/grpc_services/local_rpc/local_rpc.h> using namespace NYdb; using namespace NYdb::NTable; @@ -23,30 +23,9 @@ using namespace NYdb::NScheme; namespace NKikimr { namespace NStat { -NKikimrSubDomains::TSubDomainSettings GetSubDomainDeclareSettings(const TString &name, const TStoragePools &pools) { - NKikimrSubDomains::TSubDomainSettings subdomain; - subdomain.SetName(name); - for (auto& pool: pools) { - *subdomain.AddStoragePools() = pool; - } - return subdomain; -} - -NKikimrSubDomains::TSubDomainSettings GetSubDomainDefaultSettings(const TString &name, const TStoragePools &pools) { - NKikimrSubDomains::TSubDomainSettings subdomain; - subdomain.SetName(name); - subdomain.SetCoordinators(1); - subdomain.SetMediators(1); - subdomain.SetPlanResolution(50); - subdomain.SetTimeCastBucketsPerMediator(2); - for (auto& pool: pools) { - *subdomain.AddStoragePools() = pool; - } - return subdomain; -} - -TTestEnv::TTestEnv(ui32 staticNodes, ui32 dynamicNodes, ui32 storagePools, bool useRealThreads) - : CSController(NYDBTest::TControllers::RegisterCSControllerGuard<NYDBTest::NColumnShard::TController>()) { +TTestEnv::TTestEnv(ui32 staticNodes, ui32 dynamicNodes, bool useRealThreads) + : CSController(NYDBTest::TControllers::RegisterCSControllerGuard<NYDBTest::NColumnShard::TController>()) +{ auto mbusPort = PortManager.GetPort(); auto grpcPort = PortManager.GetPort(); @@ -55,17 +34,14 @@ TTestEnv::TTestEnv(ui32 staticNodes, ui32 dynamicNodes, ui32 storagePools, bool Settings->SetNodeCount(staticNodes); Settings->SetDynamicNodeCount(dynamicNodes); Settings->SetUseRealThreads(useRealThreads); + Settings->AddStoragePoolType("hdd1"); + Settings->AddStoragePoolType("hdd2"); NKikimrConfig::TFeatureFlags featureFlags; featureFlags.SetEnableStatistics(true); featureFlags.SetEnableColumnStatistics(true); Settings->SetFeatureFlags(featureFlags); - for (ui32 i : xrange(storagePools)) { - TString poolName = Sprintf("test%d", i); - Settings->AddStoragePool(poolName, TString("/Root:") + poolName, 2); - } - Server = new Tests::TServer(*Settings); Server->EnableGRpc(grpcPort); @@ -91,45 +67,69 @@ TTestEnv::~TTestEnv() { Driver->Stop(true); } -TStoragePools TTestEnv::GetPools() const { - TStoragePools pools; - for (const auto& [kind, pool] : Settings->StoragePoolTypes) { - pools.emplace_back(pool.GetName(), kind); +void CreateDatabase(TTestEnv& env, const TString& databaseName, + size_t nodeCount, bool isShared, const TString& poolName) +{ + auto& runtime = *env.GetServer().GetRuntime(); + auto fullDbName = Sprintf("/Root/%s", databaseName.c_str()); + + using TEvCreateDatabaseRequest = NKikimr::NGRpcService::TGrpcRequestOperationCall< + Ydb::Cms::CreateDatabaseRequest, + Ydb::Cms::CreateDatabaseResponse>; + + Ydb::Cms::CreateDatabaseRequest request; + request.set_path(fullDbName); + if (isShared) { + auto* resources = request.mutable_shared_resources(); + auto* storage = resources->add_storage_units(); + storage->set_unit_kind(poolName); + storage->set_count(1); + } else { + auto* resources = request.mutable_resources(); + auto* storage = resources->add_storage_units(); + storage->set_unit_kind(poolName); + storage->set_count(1); } - return pools; -} -void CreateDatabase(TTestEnv& env, const TString& databaseName, size_t nodeCount) { - auto subdomain = GetSubDomainDeclareSettings(databaseName); - UNIT_ASSERT_VALUES_EQUAL(NMsgBusProxy::MSTATUS_OK, - env.GetClient().CreateExtSubdomain("/Root", subdomain)); + auto future = NRpcService::DoLocalRpc<TEvCreateDatabaseRequest>( + std::move(request), "", "", runtime.GetActorSystem(0)); + auto response = runtime.WaitFuture(std::move(future)); + UNIT_ASSERT(response.operation().ready()); + UNIT_ASSERT_VALUES_EQUAL(response.operation().status(), Ydb::StatusIds::SUCCESS); - env.GetTenants().Run("/Root/" + databaseName, nodeCount); + env.GetTenants().Run(fullDbName, nodeCount); - auto subdomainSettings = GetSubDomainDefaultSettings(databaseName, env.GetPools()); - subdomainSettings.SetExternalSchemeShard(true); - subdomainSettings.SetExternalStatisticsAggregator(true); - UNIT_ASSERT_VALUES_EQUAL(NMsgBusProxy::MSTATUS_OK, - env.GetClient().AlterExtSubdomain("/Root", subdomainSettings)); + if (!env.GetServer().GetSettings().UseRealThreads) { + runtime.SimulateSleep(TDuration::Seconds(1)); + } } -void CreateServerlessDatabase(TTestEnv& env, const TString& databaseName, TPathId resourcesDomainKey) { - auto subdomain = GetSubDomainDeclareSettings(databaseName); - subdomain.MutableResourcesDomainKey()->SetSchemeShard(resourcesDomainKey.OwnerId); - subdomain.MutableResourcesDomainKey()->SetPathId(resourcesDomainKey.LocalPathId); - UNIT_ASSERT_VALUES_EQUAL(NMsgBusProxy::MSTATUS_OK, - env.GetClient().CreateExtSubdomain("/Root", subdomain)); +void CreateServerlessDatabase(TTestEnv& env, const TString& databaseName, const TString& sharedName) { + auto& runtime = *env.GetServer().GetRuntime(); + auto fullDbName = Sprintf("/Root/%s", databaseName.c_str()); + + using TEvCreateDatabaseRequest = NKikimr::NGRpcService::TGrpcRequestOperationCall< + Ydb::Cms::CreateDatabaseRequest, + Ydb::Cms::CreateDatabaseResponse>; + + Ydb::Cms::CreateDatabaseRequest request; + request.set_path(fullDbName); + request.mutable_serverless_resources()->set_shared_database_path(sharedName); - env.GetTenants().Run("/Root/" + databaseName, 0); + auto future = NRpcService::DoLocalRpc<TEvCreateDatabaseRequest>( + std::move(request), "", "", runtime.GetActorSystem(0)); + auto response = runtime.WaitFuture(std::move(future)); + UNIT_ASSERT(response.operation().ready()); + UNIT_ASSERT_VALUES_EQUAL(response.operation().status(), Ydb::StatusIds::SUCCESS); - auto subdomainSettings = GetSubDomainDefaultSettings(databaseName, env.GetPools()); - subdomainSettings.SetExternalSchemeShard(true); - UNIT_ASSERT_VALUES_EQUAL(NMsgBusProxy::MSTATUS_OK, - env.GetClient().AlterExtSubdomain("/Root", subdomainSettings)); + env.GetTenants().Run(fullDbName, 0); + + if (!env.GetServer().GetSettings().UseRealThreads) { + runtime.SimulateSleep(TDuration::Seconds(1)); + } } -TPathId ResolvePathId(TTestActorRuntime& runtime, const TString& path, TPathId* domainKey, ui64* saTabletId) -{ +TPathId ResolvePathId(TTestActorRuntime& runtime, const TString& path, TPathId* domainKey, ui64* saTabletId) { auto sender = runtime.AllocateEdgeActor(); using TNavigate = NSchemeCache::TSchemeCacheNavigate; @@ -184,8 +184,7 @@ TPathId ResolvePathId(TTestActorRuntime& runtime, const TString& path, TPathId* return resultEntry.TableId.PathId; } -NKikimrScheme::TEvDescribeSchemeResult DescribeTable(TTestActorRuntime& runtime, TActorId sender, const TString& path) -{ +NKikimrScheme::TEvDescribeSchemeResult DescribeTable(TTestActorRuntime& runtime, TActorId sender, const TString& path) { TAutoPtr<IEventHandle> handle; auto request = MakeHolder<TEvTxUserProxy::TEvNavigate>(); @@ -197,8 +196,7 @@ NKikimrScheme::TEvDescribeSchemeResult DescribeTable(TTestActorRuntime& runtime, return *reply->MutableRecord(); } -TVector<ui64> GetTableShards(TTestActorRuntime& runtime, TActorId sender, const TString& path) -{ +TVector<ui64> GetTableShards(TTestActorRuntime& runtime, TActorId sender, const TString& path) { TVector<ui64> shards; auto lsResult = DescribeTable(runtime, sender, path); for (auto &part : lsResult.GetPathDescription().GetTablePartitions()) @@ -207,8 +205,7 @@ TVector<ui64> GetTableShards(TTestActorRuntime& runtime, TActorId sender, const return shards; } -TVector<ui64> GetColumnTableShards(TTestActorRuntime& runtime, TActorId sender, const TString& path) -{ +TVector<ui64> GetColumnTableShards(TTestActorRuntime& runtime, TActorId sender, const TString& path) { TVector<ui64> shards; auto lsResult = DescribeTable(runtime, sender, path); for (auto &part : lsResult.GetPathDescription().GetColumnTableDescription().GetSharding().GetColumnShards()) @@ -217,19 +214,36 @@ TVector<ui64> GetColumnTableShards(TTestActorRuntime& runtime, TActorId sender, return shards; } -void CreateUniformTable(TTestEnv& env, const TString& databaseName, const TString& tableName) { - TTableClient client(env.GetDriver()); - auto session = client.CreateSession().GetValueSync().GetSession(); +Ydb::StatusIds::StatusCode ExecuteYqlScript(TTestEnv& env, const TString& script, bool mustSucceed) { + auto& runtime = *env.GetServer().GetRuntime(); + + using TEvExecuteYqlRequest = NGRpcService::TGrpcRequestOperationCall< + Ydb::Scripting::ExecuteYqlRequest, + Ydb::Scripting::ExecuteYqlResponse>; - auto result = session.ExecuteSchemeQuery(Sprintf(R"( + Ydb::Scripting::ExecuteYqlRequest request; + request.set_script(script); + + auto future = NRpcService::DoLocalRpc<TEvExecuteYqlRequest>( + std::move(request), "", "", runtime.GetActorSystem(0)); + auto response = runtime.WaitFuture(std::move(future)); + + UNIT_ASSERT(response.operation().ready()); + if (mustSucceed) { + UNIT_ASSERT_VALUES_EQUAL(response.operation().status(), Ydb::StatusIds::SUCCESS); + } + return response.operation().status(); +} + +void CreateUniformTable(TTestEnv& env, const TString& databaseName, const TString& tableName) { + ExecuteYqlScript(env, Sprintf(R"( CREATE TABLE `Root/%s/%s` ( Key Uint64, Value Uint64, PRIMARY KEY (Key) ) WITH ( UNIFORM_PARTITIONS = 4 ); - )", databaseName.c_str(), tableName.c_str())).GetValueSync(); - UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + )", databaseName.c_str(), tableName.c_str())); TStringBuilder replace; replace << Sprintf("REPLACE INTO `Root/%s/%s` (Key, Value) VALUES ", @@ -242,18 +256,16 @@ void CreateUniformTable(TTestEnv& env, const TString& databaseName, const TStrin replace << Sprintf("(%" PRIu64 "ul, %" PRIu64 "ul)", value, value); } replace << ";"; - result = session.ExecuteDataQuery(replace, TTxControl::BeginTx().CommitTx()).GetValueSync(); - UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + ExecuteYqlScript(env, replace); } void CreateColumnStoreTable(TTestEnv& env, const TString& databaseName, const TString& tableName, int shardCount) { - TTableClient client(env.GetDriver()); - auto session = client.CreateSession().GetValueSync().GetSession(); - auto fullTableName = Sprintf("Root/%s/%s", databaseName.c_str(), tableName.c_str()); - auto result = session.ExecuteSchemeQuery(Sprintf(R"( + auto& runtime = *env.GetServer().GetRuntime(); + + ExecuteYqlScript(env, Sprintf(R"( CREATE TABLE `%s` ( Key Uint64 NOT NULL, Value Uint64, @@ -264,35 +276,50 @@ void CreateColumnStoreTable(TTestEnv& env, const TString& databaseName, const TS STORE = COLUMN, AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = %d ); - )", fullTableName.c_str(), shardCount)).GetValueSync(); - UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + )", fullTableName.c_str(), shardCount)); + runtime.SimulateSleep(TDuration::Seconds(1)); - result = session.ExecuteSchemeQuery(Sprintf(R"( + ExecuteYqlScript(env, Sprintf(R"( ALTER OBJECT `%s` (TYPE TABLE) SET (ACTION=UPSERT_INDEX, NAME=cms_key, TYPE=COUNT_MIN_SKETCH, FEATURES=`{"column_names" : ['Key']}`); - )", fullTableName.c_str())).GetValueSync(); - UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + )", fullTableName.c_str())); + runtime.SimulateSleep(TDuration::Seconds(1)); - result = session.ExecuteSchemeQuery(Sprintf(R"( + ExecuteYqlScript(env, Sprintf(R"( ALTER OBJECT `%s` (TYPE TABLE) SET (ACTION=UPSERT_INDEX, NAME=cms_value, TYPE=COUNT_MIN_SKETCH, FEATURES=`{"column_names" : ['Value']}`); - )", fullTableName.c_str())).GetValueSync(); - UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); - - NYdb::TValueBuilder rows; - rows.BeginList(); + )", fullTableName.c_str())); + runtime.SimulateSleep(TDuration::Seconds(1)); + + using TEvBulkUpsertRequest = NGRpcService::TGrpcRequestOperationCall< + Ydb::Table::BulkUpsertRequest, + Ydb::Table::BulkUpsertResponse>; + + Ydb::Table::BulkUpsertRequest request; + request.set_table(fullTableName); + auto* rows = request.mutable_rows(); + + auto* reqRowType = rows->mutable_type()->mutable_list_type()->mutable_item()->mutable_struct_type(); + auto* reqKeyType = reqRowType->add_members(); + reqKeyType->set_name("Key"); + reqKeyType->mutable_type()->set_type_id(Ydb::Type::UINT64); + auto* reqValueType = reqRowType->add_members(); + reqValueType->set_name("Value"); + reqValueType->mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::UINT64); + + auto* reqRows = rows->mutable_value(); for (size_t i = 0; i < ColumnTableRowsNumber; ++i) { - auto key = TValueBuilder().Uint64(i).Build(); - auto value = TValueBuilder().OptionalUint64(i).Build(); - rows.AddListItem(); - rows.BeginStruct(); - rows.AddMember("Key", key); - rows.AddMember("Value", value); - rows.EndStruct(); + auto* row = reqRows->add_items(); + row->add_items()->set_uint64_value(i); + row->add_items()->set_uint64_value(i); } - rows.EndList(); - result = client.BulkUpsert(fullTableName, rows.Build()).GetValueSync(); - UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + + auto future = NRpcService::DoLocalRpc<TEvBulkUpsertRequest>( + std::move(request), "", "", runtime.GetActorSystem(0)); + auto response = runtime.WaitFuture(std::move(future)); + + UNIT_ASSERT(response.operation().ready()); + UNIT_ASSERT_VALUES_EQUAL(response.operation().status(), Ydb::StatusIds::SUCCESS); env.GetController()->WaitActualization(TDuration::Seconds(1)); } @@ -313,57 +340,30 @@ std::vector<TTableInfo> GatherColumnTablesInfo(TTestEnv& env, ui8 tableCount) { } std::vector<TTableInfo> CreateDatabaseColumnTables(TTestEnv& env, ui8 tableCount, ui8 shardCount) { - auto init = [&] () { - CreateDatabase(env, "Database"); - for (ui8 tableId = 1; tableId <= tableCount; tableId++) { - CreateColumnStoreTable(env, "Database", Sprintf("Table%u", tableId), shardCount); - } - }; - std::thread initThread(init); + CreateDatabase(env, "Database"); - auto& runtime = *env.GetServer().GetRuntime(); - - runtime.SimulateSleep(TDuration::Seconds(10)); - initThread.join(); + for (ui8 tableId = 1; tableId <= tableCount; tableId++) { + CreateColumnStoreTable(env, "Database", Sprintf("Table%u", tableId), shardCount); + } return GatherColumnTablesInfo(env, tableCount); } std::vector<TTableInfo> CreateServerlessDatabaseColumnTables(TTestEnv& env, ui8 tableCount, ui8 shardCount) { - auto init = [&] () { - CreateDatabase(env, "Shared"); - }; - std::thread initThread(init); - - auto& runtime = *env.GetServer().GetRuntime(); - runtime.SimulateSleep(TDuration::Seconds(5)); - initThread.join(); + CreateDatabase(env, "Shared", 1, true); + CreateServerlessDatabase(env, "Database", "/Root/Shared"); - TPathId domainKey; - ResolvePathId(runtime, "/Root/Shared", &domainKey); - - auto init2 = [&] () { - CreateServerlessDatabase(env, "Database", domainKey); - for (ui8 tableId = 1; tableId <= tableCount; tableId++) { - CreateColumnStoreTable(env, "Database", Sprintf("Table%u", tableId), shardCount); - } - }; - std::thread init2Thread(init2); - - runtime.SimulateSleep(TDuration::Seconds(5)); - init2Thread.join(); + for (ui8 tableId = 1; tableId <= tableCount; tableId++) { + CreateColumnStoreTable(env, "Database", Sprintf("Table%u", tableId), shardCount); + } return GatherColumnTablesInfo(env, tableCount); } void DropTable(TTestEnv& env, const TString& databaseName, const TString& tableName) { - TTableClient client(env.GetDriver()); - auto session = client.CreateSession().GetValueSync().GetSession(); - - auto result = session.ExecuteSchemeQuery(Sprintf(R"( + ExecuteYqlScript(env, Sprintf(R"( DROP TABLE `Root/%s/%s`; - )", databaseName.c_str(), tableName.c_str())).GetValueSync(); - UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + )", databaseName.c_str(), tableName.c_str())); } std::shared_ptr<TCountMinSketch> ExtractCountMin(TTestActorRuntime& runtime, const TPathId& pathId, ui64 columnTag) { diff --git a/ydb/core/statistics/ut_common/ut_common.h b/ydb/core/statistics/ut_common/ut_common.h index 47132fc873..069fe3eaf8 100644 --- a/ydb/core/statistics/ut_common/ut_common.h +++ b/ydb/core/statistics/ut_common/ut_common.h @@ -16,15 +16,9 @@ namespace NStat { static constexpr ui32 ColumnTableRowsNumber = 1000; -NKikimrSubDomains::TSubDomainSettings GetSubDomainDeclareSettings( - const TString &name, const TStoragePools &pools = {}); - -NKikimrSubDomains::TSubDomainSettings GetSubDomainDefaultSettings( - const TString &name, const TStoragePools &pools = {}); - class TTestEnv { public: - TTestEnv(ui32 staticNodes = 1, ui32 dynamicNodes = 1, ui32 storagePools = 1, bool useRealThreads = false); + TTestEnv(ui32 staticNodes = 1, ui32 dynamicNodes = 1, bool useRealThreads = false); ~TTestEnv(); Tests::TServer& GetServer() const { @@ -51,8 +45,6 @@ public: return Settings; } - TStoragePools GetPools() const; - auto& GetController() { return CSController; } @@ -71,9 +63,12 @@ private: NYDBTest::TControllers::TGuard<NYDBTest::NColumnShard::TController> CSController; }; -void CreateDatabase(TTestEnv& env, const TString& databaseName, size_t nodeCount = 1); +Ydb::StatusIds::StatusCode ExecuteYqlScript(TTestEnv& env, const TString& script, bool mustSucceed = true); + +void CreateDatabase(TTestEnv& env, const TString& databaseName, + size_t nodeCount = 1, bool isShared = false, const TString& poolName = "hdd1"); -void CreateServerlessDatabase(TTestEnv& env, const TString& databaseName, TPathId resourcesDomainKey); +void CreateServerlessDatabase(TTestEnv& env, const TString& databaseName, const TString& sharedName); struct TTableInfo { std::vector<ui64> ShardIds; diff --git a/ydb/library/table_creator/table_creator.cpp b/ydb/library/table_creator/table_creator.cpp index 2f96883bff..7a3b9abe1f 100644 --- a/ydb/library/table_creator/table_creator.cpp +++ b/ydb/library/table_creator/table_creator.cpp @@ -35,6 +35,7 @@ public: TVector<TString> keyColumns, NKikimrServices::EServiceKikimr logService, TMaybe<NKikimrSchemeOp::TTTLSettings> ttlSettings = Nothing(), + const TString& database = {}, bool isSystemUser = false, TMaybe<NKikimrSchemeOp::TPartitioningPolicy> partitioningPolicy = Nothing()) : PathComponents(std::move(pathComponents)) @@ -42,6 +43,7 @@ public: , KeyColumns(std::move(keyColumns)) , LogService(logService) , TtlSettings(std::move(ttlSettings)) + , Database(database) , IsSystemUser(isSystemUser) , PartitioningPolicy(std::move(partitioningPolicy)) , LogPrefix("Table " + TableName() + " updater. ") @@ -73,19 +75,22 @@ public: void Bootstrap() { Become(&TTableCreator::StateFuncCheck); + if (!Database) { + Database = AppData()->TenantName; + } CheckTableExistence(); } void CheckTableExistence() { Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvNavigateKeySet(NTableCreator::BuildSchemeCacheNavigateRequest( - {PathComponents} + {PathComponents}, Database ).Release()), IEventHandle::FlagTrackDelivery); } void RunTableRequest() { auto request = MakeHolder<TEvTxUserProxy::TEvProposeTransaction>(); NKikimrSchemeOp::TModifyScheme& modifyScheme = *request->Record.MutableTransaction()->MutableModifyScheme(); - auto pathComponents = SplitPath(AppData()->TenantName); + auto pathComponents = SplitPath(Database); for (size_t i = 0; i < PathComponents.size() - 1; ++i) { pathComponents.emplace_back(PathComponents[i]); } @@ -388,6 +393,7 @@ private: const TVector<TString> KeyColumns; NKikimrServices::EServiceKikimr LogService; const TMaybe<NKikimrSchemeOp::TTTLSettings> TtlSettings; + TString Database; bool IsSystemUser = false; const TMaybe<NKikimrSchemeOp::TPartitioningPolicy> PartitioningPolicy; NKikimrSchemeOp::EOperationType OperationType = NKikimrSchemeOp::EOperationType::ESchemeOpCreateTable; @@ -422,8 +428,10 @@ THolder<NSchemeCache::TSchemeCacheNavigate> BuildSchemeCacheNavigateRequest(cons return request; } -THolder<NSchemeCache::TSchemeCacheNavigate> BuildSchemeCacheNavigateRequest(const TVector<TVector<TString>>& pathsComponents) { - return BuildSchemeCacheNavigateRequest(pathsComponents, AppData()->TenantName, nullptr); +THolder<NSchemeCache::TSchemeCacheNavigate> BuildSchemeCacheNavigateRequest( + const TVector<TVector<TString>>& pathsComponents, const TString& database) +{ + return BuildSchemeCacheNavigateRequest(pathsComponents, database ? database : AppData()->TenantName, nullptr); } NKikimrSchemeOp::TColumnDescription TMultiTableCreator::Col(const TString& columnName, const char* columnType) { @@ -488,12 +496,13 @@ NActors::IActor* CreateTableCreator( TVector<TString> keyColumns, NKikimrServices::EServiceKikimr logService, TMaybe<NKikimrSchemeOp::TTTLSettings> ttlSettings, + const TString& database, bool isSystemUser, TMaybe<NKikimrSchemeOp::TPartitioningPolicy> partitioningPolicy) { return new TTableCreator(std::move(pathComponents), std::move(columns), - std::move(keyColumns), logService, std::move(ttlSettings), isSystemUser, - std::move(partitioningPolicy)); + std::move(keyColumns), logService, std::move(ttlSettings), database, + isSystemUser, std::move(partitioningPolicy)); } } // namespace NKikimr diff --git a/ydb/library/table_creator/table_creator.h b/ydb/library/table_creator/table_creator.h index 39e40d4280..c036e4a6cb 100644 --- a/ydb/library/table_creator/table_creator.h +++ b/ydb/library/table_creator/table_creator.h @@ -67,7 +67,7 @@ private: }; THolder<NSchemeCache::TSchemeCacheNavigate> BuildSchemeCacheNavigateRequest(const TVector<TVector<TString>>& pathsComponents, const TString& database, TIntrusiveConstPtr<NACLib::TUserToken> userToken); -THolder<NSchemeCache::TSchemeCacheNavigate> BuildSchemeCacheNavigateRequest(const TVector<TVector<TString>>& pathsComponents); +THolder<NSchemeCache::TSchemeCacheNavigate> BuildSchemeCacheNavigateRequest(const TVector<TVector<TString>>& pathsComponents, const TString& database = {}); } // namespace NTableCreator @@ -77,6 +77,7 @@ NActors::IActor* CreateTableCreator( TVector<TString> keyColumns, NKikimrServices::EServiceKikimr logService, TMaybe<NKikimrSchemeOp::TTTLSettings> ttlSettings = Nothing(), + const TString& database = {}, bool isSystemUser = false, TMaybe<NKikimrSchemeOp::TPartitioningPolicy> partitioningPolicy = Nothing()); |