diff options
author | tesseract <tesseract@yandex-team.com> | 2023-03-07 17:48:04 +0300 |
---|---|---|
committer | tesseract <tesseract@yandex-team.com> | 2023-03-07 17:48:04 +0300 |
commit | fbe3047276b6f2f70eff2930af9e4b8251d8d9af (patch) | |
tree | 248e65f55401225d7b4ed1db6f2ccf48b7c328e0 | |
parent | 9a17e39c1c58a9e4b00b5792e70177c9520103d5 (diff) | |
download | ydb-fbe3047276b6f2f70eff2930af9e4b8251d8d9af.tar.gz |
Сделать persist статистики о топиках в SchemeShard-е
7 files changed, 178 insertions, 3 deletions
diff --git a/ydb/core/tx/schemeshard/schemeshard__init.cpp b/ydb/core/tx/schemeshard/schemeshard__init.cpp index afc3caa417..c52b0d0c72 100644 --- a/ydb/core/tx/schemeshard/schemeshard__init.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__init.cpp @@ -2485,6 +2485,34 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> { } } + // Read PersQueue groups stats + { + auto rowset = db.Table<Schema::PersQueueGroupStats>().Range().Select(); + if (!rowset.IsReady()) { + return false; + } + while (!rowset.EndOfSet()) { + TLocalPathId localPathId = rowset.GetValue<Schema::PersQueueGroupStats::PathId>(); + TPathId pathId(selfId, localPathId); + + auto it = Self->Topics.find(pathId); + if (it == Self->Topics.end()) { + continue; + } + + auto& topic = it->second; + topic->Stats.SeqNo = TMessageSeqNo(rowset.GetValue<Schema::PersQueueGroupStats::SeqNoGeneration>(), rowset.GetValue<Schema::PersQueueGroupStats::SeqNoRound>()); + topic->Stats.DataSize = rowset.GetValue<Schema::PersQueueGroupStats::DataSize>(); + topic->Stats.UsedReserveSize = rowset.GetValue<Schema::PersQueueGroupStats::UsedReserveSize>(); + + Self->ResolveDomainInfo(pathId)->AggrDiskSpaceUsage(topic->Stats, {}); + + if (!rowset.Next()) { + return false; + } + } + } + // Read RTMR volumes { diff --git a/ydb/core/tx/schemeshard/schemeshard__pq_stats.cpp b/ydb/core/tx/schemeshard/schemeshard__pq_stats.cpp index 8193d85f03..7ea2bce360 100644 --- a/ydb/core/tx/schemeshard/schemeshard__pq_stats.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__pq_stats.cpp @@ -47,9 +47,11 @@ bool TTxStoreTopicStats::PersistSingleStats(const TPathId& pathId, const TStatsQ oldStats = newStats; - if (subDomainInfo->CheckDiskSpaceQuotas(Self)) { - NIceDb::TNiceDb db(txc.DB); + NIceDb::TNiceDb db(txc.DB); + + Self->PersistPersQueueGroupStats(db, pathId, newStats); + if (subDomainInfo->CheckDiskSpaceQuotas(Self)) { auto subDomainId = Self->ResolvePathIdForDomain(pathId); Self->PersistSubDomainState(db, subDomainId, *subDomainInfo); diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.cpp b/ydb/core/tx/schemeshard/schemeshard_impl.cpp index b061571cce..96cf92f744 100644 --- a/ydb/core/tx/schemeshard/schemeshard_impl.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_impl.cpp @@ -2336,6 +2336,16 @@ void TSchemeShard::PersistTablePartitionStats(NIceDb::TNiceDb& db, const TPathId } } +void TSchemeShard::PersistPersQueueGroupStats(NIceDb::TNiceDb &db, const TPathId pathId, const TTopicStats& stats) { + db.Table<Schema::PersQueueGroupStats>().Key(pathId.LocalPathId).Update( + NIceDb::TUpdate<Schema::PersQueueGroupStats::SeqNoGeneration>(stats.SeqNo.Generation), + NIceDb::TUpdate<Schema::PersQueueGroupStats::SeqNoRound>(stats.SeqNo.Round), + + NIceDb::TUpdate<Schema::PersQueueGroupStats::DataSize>(stats.DataSize), + NIceDb::TUpdate<Schema::PersQueueGroupStats::UsedReserveSize>(stats.UsedReserveSize) + ); +} + void TSchemeShard::PersistTableAlterVersion(NIceDb::TNiceDb& db, const TPathId pathId, const TTableInfo::TPtr tableInfo) { if (pathId.OwnerId == TabletID()) { db.Table<Schema::Tables>().Key(pathId.LocalPathId).Update( diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.h b/ydb/core/tx/schemeshard/schemeshard_impl.h index 4a8f33b8b5..3f4656cc4b 100644 --- a/ydb/core/tx/schemeshard/schemeshard_impl.h +++ b/ydb/core/tx/schemeshard/schemeshard_impl.h @@ -593,6 +593,7 @@ public: void PersistTableAltered(NIceDb::TNiceDb &db, const TPathId pathId, const TTableInfo::TPtr tableInfo); void PersistAddAlterTable(NIceDb::TNiceDb& db, TPathId pathId, const TTableInfo::TAlterDataPtr alter); void PersistPersQueueGroup(NIceDb::TNiceDb &db, TPathId pathId, const TTopicInfo::TPtr); + void PersistPersQueueGroupStats(NIceDb::TNiceDb &db, const TPathId pathId, const TTopicStats& stats); void PersistRemovePersQueueGroup(NIceDb::TNiceDb &db, TPathId pathId); void PersistAddPersQueueGroupAlter(NIceDb::TNiceDb &db, TPathId pathId, const TTopicInfo::TPtr); void PersistRemovePersQueueGroupAlter(NIceDb::TNiceDb &db, TPathId pathId); diff --git a/ydb/core/tx/schemeshard/schemeshard_schema.h b/ydb/core/tx/schemeshard/schemeshard_schema.h index 8e2e2a1879..9736e6ff83 100644 --- a/ydb/core/tx/schemeshard/schemeshard_schema.h +++ b/ydb/core/tx/schemeshard/schemeshard_schema.h @@ -1646,6 +1646,19 @@ struct Schema : NIceDb::Schema { using TColumns = TableColumns<OwnerPathId, LocalPathId, AlterVersion, SourceType, Location, Installation, Auth, ExternalTableReferences>; }; + struct PersQueueGroupStats : Table<106> { + struct PathId : Column<1, NScheme::NTypeIds::Uint64> {}; + + struct SeqNoGeneration : Column<2, NScheme::NTypeIds::Uint64> {}; + struct SeqNoRound : Column<3, NScheme::NTypeIds::Uint64> {}; + + struct DataSize : Column<4, NScheme::NTypeIds::Uint64> {}; + struct UsedReserveSize : Column<5, NScheme::NTypeIds::Uint64> {}; + + using TKey = TableKey<PathId>; + using TColumns = TableColumns<PathId, SeqNoGeneration, SeqNoRound, DataSize, UsedReserveSize>; + }; + using TTables = SchemaTables< Paths, TxInFlight, @@ -1750,7 +1763,8 @@ struct Schema : NIceDb::Schema { BlobDepots, CdcStreamScanShardStatus, ExternalTable, - ExternalDataSource + ExternalDataSource, + PersQueueGroupStats >; static constexpr ui64 SysParam_NextPathId = 1; diff --git a/ydb/core/tx/schemeshard/ut_stats.cpp b/ydb/core/tx/schemeshard/ut_stats.cpp index 51029221ef..b0fd243a03 100644 --- a/ydb/core/tx/schemeshard/ut_stats.cpp +++ b/ydb/core/tx/schemeshard/ut_stats.cpp @@ -505,4 +505,65 @@ Y_UNIT_TEST_SUITE(TSchemeshardStatsBatchingTest) { Assert(69, 0); // 67 - it is unstable value. it can change if internal message store change } + + Y_UNIT_TEST(PeriodicTopicStatsReload) { + TTestBasicRuntime runtime; + TTestEnv env(runtime); + + runtime.SetLogPriority(NKikimrServices::FLAT_TX_SCHEMESHARD, NActors::NLog::PRI_TRACE); + + auto& appData = runtime.GetAppData(); + + ui64 txId = 100; + + // disable batching + appData.SchemeShardConfig.SetStatsBatchTimeoutMs(0); + appData.SchemeShardConfig.SetStatsMaxBatchSize(0); + + // apply config via reboot + TActorId sender = runtime.AllocateEdgeActor(); + + GracefulRestartTablet(runtime, TTestTxConfig::SchemeShard, sender); + + const auto AssertTopicSize = [&] (ui64 expectedAccountSize, ui64 expectedUsedReserveSize) { + TestDescribeResult(DescribePath(runtime, "/MyRoot/Topic1"), + {NLs::Finished, + NLs::TopicAccountSize(expectedAccountSize), + NLs::TopicUsedReserveSize(expectedUsedReserveSize)}); + }; + + TestCreatePQGroup(runtime, ++txId, "/MyRoot", R"( + Name: "Topic1" + TotalGroupCount: 1 + PartitionPerTablet: 1 + PQTabletConfig { + PartitionConfig { + LifetimeSeconds: 1 + WriteSpeedInBytesPerSecond : 7 + + } + MeteringMode: METERING_MODE_RESERVED_CAPACITY + } + )"); + env.TestWaitNotification(runtime, txId); + AssertTopicSize(7, 0); + + + ui64 topic1Id = DescribePath(runtime, "/MyRoot/Topic1").GetPathDescription().GetSelf().GetPathId(); + + ui64 generation = 1; + ui64 round = 97; + + SendTEvPeriodicTopicStats(runtime, topic1Id, generation, round, 17, 7); + AssertTopicSize(17, 7); + + GracefulRestartTablet(runtime, TTestTxConfig::SchemeShard, sender); + + AssertTopicSize(17, 7); // loaded from db + + SendTEvPeriodicTopicStats(runtime, topic1Id, generation, round - 1, 19, 7); + + AssertTopicSize(17, 7); // not changed because round is less + } + }; diff --git a/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_schemeshard_/flat_schemeshard.schema b/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_schemeshard_/flat_schemeshard.schema index 1e2fe85de6..8a93d30cc3 100644 --- a/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_schemeshard_/flat_schemeshard.schema +++ b/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_schemeshard_/flat_schemeshard.schema @@ -6920,5 +6920,64 @@ "Blobs": 1 } } + }, + { + "TableId": 106, + "TableName": "PersQueueGroupStats", + "TableKey": [ + 1 + ], + "ColumnsAdded": [ + { + "ColumnId": 1, + "ColumnName": "PathId", + "ColumnType": "Uint64" + }, + { + "ColumnId": 2, + "ColumnName": "SeqNoGeneration", + "ColumnType": "Uint64" + }, + { + "ColumnId": 3, + "ColumnName": "SeqNoRound", + "ColumnType": "Uint64" + }, + { + "ColumnId": 4, + "ColumnName": "DataSize", + "ColumnType": "Uint64" + }, + { + "ColumnId": 5, + "ColumnName": "UsedReserveSize", + "ColumnType": "Uint64" + } + ], + "ColumnsDropped": [], + "ColumnFamilies": { + "0": { + "Columns": [ + 1, + 2, + 3, + 4, + 5 + ], + "RoomID": 0, + "Codec": 0, + "InMemory": false, + "Cache": 0, + "Small": 4294967295, + "Large": 4294967295 + } + }, + "Rooms": { + "0": { + "Main": 1, + "Outer": 1, + "Blobs": 1 + } + } } ]
\ No newline at end of file |