aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authortesseract <tesseract@yandex-team.com>2023-03-07 17:48:04 +0300
committertesseract <tesseract@yandex-team.com>2023-03-07 17:48:04 +0300
commitfbe3047276b6f2f70eff2930af9e4b8251d8d9af (patch)
tree248e65f55401225d7b4ed1db6f2ccf48b7c328e0
parent9a17e39c1c58a9e4b00b5792e70177c9520103d5 (diff)
downloadydb-fbe3047276b6f2f70eff2930af9e4b8251d8d9af.tar.gz
Сделать persist статистики о топиках в SchemeShard-е
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__init.cpp28
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__pq_stats.cpp6
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_impl.cpp10
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_impl.h1
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_schema.h16
-rw-r--r--ydb/core/tx/schemeshard/ut_stats.cpp61
-rw-r--r--ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_schemeshard_/flat_schemeshard.schema59
7 files changed, 178 insertions, 3 deletions
diff --git a/ydb/core/tx/schemeshard/schemeshard__init.cpp b/ydb/core/tx/schemeshard/schemeshard__init.cpp
index afc3caa417..c52b0d0c72 100644
--- a/ydb/core/tx/schemeshard/schemeshard__init.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__init.cpp
@@ -2485,6 +2485,34 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> {
}
}
+ // Read PersQueue groups stats
+ {
+ auto rowset = db.Table<Schema::PersQueueGroupStats>().Range().Select();
+ if (!rowset.IsReady()) {
+ return false;
+ }
+ while (!rowset.EndOfSet()) {
+ TLocalPathId localPathId = rowset.GetValue<Schema::PersQueueGroupStats::PathId>();
+ TPathId pathId(selfId, localPathId);
+
+ auto it = Self->Topics.find(pathId);
+ if (it == Self->Topics.end()) {
+ continue;
+ }
+
+ auto& topic = it->second;
+ topic->Stats.SeqNo = TMessageSeqNo(rowset.GetValue<Schema::PersQueueGroupStats::SeqNoGeneration>(), rowset.GetValue<Schema::PersQueueGroupStats::SeqNoRound>());
+ topic->Stats.DataSize = rowset.GetValue<Schema::PersQueueGroupStats::DataSize>();
+ topic->Stats.UsedReserveSize = rowset.GetValue<Schema::PersQueueGroupStats::UsedReserveSize>();
+
+ Self->ResolveDomainInfo(pathId)->AggrDiskSpaceUsage(topic->Stats, {});
+
+ if (!rowset.Next()) {
+ return false;
+ }
+ }
+ }
+
// Read RTMR volumes
{
diff --git a/ydb/core/tx/schemeshard/schemeshard__pq_stats.cpp b/ydb/core/tx/schemeshard/schemeshard__pq_stats.cpp
index 8193d85f03..7ea2bce360 100644
--- a/ydb/core/tx/schemeshard/schemeshard__pq_stats.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__pq_stats.cpp
@@ -47,9 +47,11 @@ bool TTxStoreTopicStats::PersistSingleStats(const TPathId& pathId, const TStatsQ
oldStats = newStats;
- if (subDomainInfo->CheckDiskSpaceQuotas(Self)) {
- NIceDb::TNiceDb db(txc.DB);
+ NIceDb::TNiceDb db(txc.DB);
+
+ Self->PersistPersQueueGroupStats(db, pathId, newStats);
+ if (subDomainInfo->CheckDiskSpaceQuotas(Self)) {
auto subDomainId = Self->ResolvePathIdForDomain(pathId);
Self->PersistSubDomainState(db, subDomainId, *subDomainInfo);
diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.cpp b/ydb/core/tx/schemeshard/schemeshard_impl.cpp
index b061571cce..96cf92f744 100644
--- a/ydb/core/tx/schemeshard/schemeshard_impl.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard_impl.cpp
@@ -2336,6 +2336,16 @@ void TSchemeShard::PersistTablePartitionStats(NIceDb::TNiceDb& db, const TPathId
}
}
+void TSchemeShard::PersistPersQueueGroupStats(NIceDb::TNiceDb &db, const TPathId pathId, const TTopicStats& stats) {
+ db.Table<Schema::PersQueueGroupStats>().Key(pathId.LocalPathId).Update(
+ NIceDb::TUpdate<Schema::PersQueueGroupStats::SeqNoGeneration>(stats.SeqNo.Generation),
+ NIceDb::TUpdate<Schema::PersQueueGroupStats::SeqNoRound>(stats.SeqNo.Round),
+
+ NIceDb::TUpdate<Schema::PersQueueGroupStats::DataSize>(stats.DataSize),
+ NIceDb::TUpdate<Schema::PersQueueGroupStats::UsedReserveSize>(stats.UsedReserveSize)
+ );
+}
+
void TSchemeShard::PersistTableAlterVersion(NIceDb::TNiceDb& db, const TPathId pathId, const TTableInfo::TPtr tableInfo) {
if (pathId.OwnerId == TabletID()) {
db.Table<Schema::Tables>().Key(pathId.LocalPathId).Update(
diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.h b/ydb/core/tx/schemeshard/schemeshard_impl.h
index 4a8f33b8b5..3f4656cc4b 100644
--- a/ydb/core/tx/schemeshard/schemeshard_impl.h
+++ b/ydb/core/tx/schemeshard/schemeshard_impl.h
@@ -593,6 +593,7 @@ public:
void PersistTableAltered(NIceDb::TNiceDb &db, const TPathId pathId, const TTableInfo::TPtr tableInfo);
void PersistAddAlterTable(NIceDb::TNiceDb& db, TPathId pathId, const TTableInfo::TAlterDataPtr alter);
void PersistPersQueueGroup(NIceDb::TNiceDb &db, TPathId pathId, const TTopicInfo::TPtr);
+ void PersistPersQueueGroupStats(NIceDb::TNiceDb &db, const TPathId pathId, const TTopicStats& stats);
void PersistRemovePersQueueGroup(NIceDb::TNiceDb &db, TPathId pathId);
void PersistAddPersQueueGroupAlter(NIceDb::TNiceDb &db, TPathId pathId, const TTopicInfo::TPtr);
void PersistRemovePersQueueGroupAlter(NIceDb::TNiceDb &db, TPathId pathId);
diff --git a/ydb/core/tx/schemeshard/schemeshard_schema.h b/ydb/core/tx/schemeshard/schemeshard_schema.h
index 8e2e2a1879..9736e6ff83 100644
--- a/ydb/core/tx/schemeshard/schemeshard_schema.h
+++ b/ydb/core/tx/schemeshard/schemeshard_schema.h
@@ -1646,6 +1646,19 @@ struct Schema : NIceDb::Schema {
using TColumns = TableColumns<OwnerPathId, LocalPathId, AlterVersion, SourceType, Location, Installation, Auth, ExternalTableReferences>;
};
+ struct PersQueueGroupStats : Table<106> {
+ struct PathId : Column<1, NScheme::NTypeIds::Uint64> {};
+
+ struct SeqNoGeneration : Column<2, NScheme::NTypeIds::Uint64> {};
+ struct SeqNoRound : Column<3, NScheme::NTypeIds::Uint64> {};
+
+ struct DataSize : Column<4, NScheme::NTypeIds::Uint64> {};
+ struct UsedReserveSize : Column<5, NScheme::NTypeIds::Uint64> {};
+
+ using TKey = TableKey<PathId>;
+ using TColumns = TableColumns<PathId, SeqNoGeneration, SeqNoRound, DataSize, UsedReserveSize>;
+ };
+
using TTables = SchemaTables<
Paths,
TxInFlight,
@@ -1750,7 +1763,8 @@ struct Schema : NIceDb::Schema {
BlobDepots,
CdcStreamScanShardStatus,
ExternalTable,
- ExternalDataSource
+ ExternalDataSource,
+ PersQueueGroupStats
>;
static constexpr ui64 SysParam_NextPathId = 1;
diff --git a/ydb/core/tx/schemeshard/ut_stats.cpp b/ydb/core/tx/schemeshard/ut_stats.cpp
index 51029221ef..b0fd243a03 100644
--- a/ydb/core/tx/schemeshard/ut_stats.cpp
+++ b/ydb/core/tx/schemeshard/ut_stats.cpp
@@ -505,4 +505,65 @@ Y_UNIT_TEST_SUITE(TSchemeshardStatsBatchingTest) {
Assert(69, 0); // 67 - it is unstable value. it can change if internal message store change
}
+
+ Y_UNIT_TEST(PeriodicTopicStatsReload) {
+ TTestBasicRuntime runtime;
+ TTestEnv env(runtime);
+
+ runtime.SetLogPriority(NKikimrServices::FLAT_TX_SCHEMESHARD, NActors::NLog::PRI_TRACE);
+
+ auto& appData = runtime.GetAppData();
+
+ ui64 txId = 100;
+
+ // disable batching
+ appData.SchemeShardConfig.SetStatsBatchTimeoutMs(0);
+ appData.SchemeShardConfig.SetStatsMaxBatchSize(0);
+
+ // apply config via reboot
+ TActorId sender = runtime.AllocateEdgeActor();
+
+ GracefulRestartTablet(runtime, TTestTxConfig::SchemeShard, sender);
+
+ const auto AssertTopicSize = [&] (ui64 expectedAccountSize, ui64 expectedUsedReserveSize) {
+ TestDescribeResult(DescribePath(runtime, "/MyRoot/Topic1"),
+ {NLs::Finished,
+ NLs::TopicAccountSize(expectedAccountSize),
+ NLs::TopicUsedReserveSize(expectedUsedReserveSize)});
+ };
+
+ TestCreatePQGroup(runtime, ++txId, "/MyRoot", R"(
+ Name: "Topic1"
+ TotalGroupCount: 1
+ PartitionPerTablet: 1
+ PQTabletConfig {
+ PartitionConfig {
+ LifetimeSeconds: 1
+ WriteSpeedInBytesPerSecond : 7
+
+ }
+ MeteringMode: METERING_MODE_RESERVED_CAPACITY
+ }
+ )");
+ env.TestWaitNotification(runtime, txId);
+ AssertTopicSize(7, 0);
+
+
+ ui64 topic1Id = DescribePath(runtime, "/MyRoot/Topic1").GetPathDescription().GetSelf().GetPathId();
+
+ ui64 generation = 1;
+ ui64 round = 97;
+
+ SendTEvPeriodicTopicStats(runtime, topic1Id, generation, round, 17, 7);
+ AssertTopicSize(17, 7);
+
+ GracefulRestartTablet(runtime, TTestTxConfig::SchemeShard, sender);
+
+ AssertTopicSize(17, 7); // loaded from db
+
+ SendTEvPeriodicTopicStats(runtime, topic1Id, generation, round - 1, 19, 7);
+
+ AssertTopicSize(17, 7); // not changed because round is less
+ }
+
};
diff --git a/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_schemeshard_/flat_schemeshard.schema b/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_schemeshard_/flat_schemeshard.schema
index 1e2fe85de6..8a93d30cc3 100644
--- a/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_schemeshard_/flat_schemeshard.schema
+++ b/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_schemeshard_/flat_schemeshard.schema
@@ -6920,5 +6920,64 @@
"Blobs": 1
}
}
+ },
+ {
+ "TableId": 106,
+ "TableName": "PersQueueGroupStats",
+ "TableKey": [
+ 1
+ ],
+ "ColumnsAdded": [
+ {
+ "ColumnId": 1,
+ "ColumnName": "PathId",
+ "ColumnType": "Uint64"
+ },
+ {
+ "ColumnId": 2,
+ "ColumnName": "SeqNoGeneration",
+ "ColumnType": "Uint64"
+ },
+ {
+ "ColumnId": 3,
+ "ColumnName": "SeqNoRound",
+ "ColumnType": "Uint64"
+ },
+ {
+ "ColumnId": 4,
+ "ColumnName": "DataSize",
+ "ColumnType": "Uint64"
+ },
+ {
+ "ColumnId": 5,
+ "ColumnName": "UsedReserveSize",
+ "ColumnType": "Uint64"
+ }
+ ],
+ "ColumnsDropped": [],
+ "ColumnFamilies": {
+ "0": {
+ "Columns": [
+ 1,
+ 2,
+ 3,
+ 4,
+ 5
+ ],
+ "RoomID": 0,
+ "Codec": 0,
+ "InMemory": false,
+ "Cache": 0,
+ "Small": 4294967295,
+ "Large": 4294967295
+ }
+ },
+ "Rooms": {
+ "0": {
+ "Main": 1,
+ "Outer": 1,
+ "Blobs": 1
+ }
+ }
}
] \ No newline at end of file