diff options
author | tesseract <tesseract@yandex-team.com> | 2023-05-24 15:37:15 +0300 |
---|---|---|
committer | tesseract <tesseract@yandex-team.com> | 2023-05-24 15:37:15 +0300 |
commit | 938940585bd002f480d1a8c29cb707085c2af3a5 (patch) | |
tree | 58cabe420216daf762ea227701fa416034e808e7 | |
parent | e067ade1b37cae5f22ca9d91c3e67e2511b7185d (diff) | |
download | ydb-938940585bd002f480d1a8c29cb707085c2af3a5.tar.gz |
Topic storage size metrics #2
-rw-r--r-- | ydb/core/persqueue/partition.cpp | 46 | ||||
-rw-r--r-- | ydb/core/persqueue/partition.h | 11 | ||||
-rw-r--r-- | ydb/core/persqueue/ut/common/pq_ut_common.cpp | 1 | ||||
-rw-r--r-- | ydb/core/persqueue/ut/common/pq_ut_common.h | 1 | ||||
-rw-r--r-- | ydb/core/persqueue/ut/counters_ut.cpp | 5 | ||||
-rw-r--r-- | ydb/core/persqueue/ut/resources/counters_labeled.json | 36 | ||||
-rw-r--r-- | ydb/core/persqueue/ut/resources/counters_topics.html | 2 | ||||
-rw-r--r-- | ydb/core/protos/counters_pq.proto | 3 | ||||
-rw-r--r-- | ydb/core/protos/counters_schemeshard.proto | 1 | ||||
-rw-r--r-- | ydb/core/tablet/tablet_counters_aggregator.cpp | 14 | ||||
-rw-r--r-- | ydb/core/tx/schemeshard/schemeshard__operation_deallocate_pq.cpp | 1 | ||||
-rw-r--r-- | ydb/core/tx/schemeshard/schemeshard__operation_drop_pq.cpp | 1 | ||||
-rw-r--r-- | ydb/core/tx/schemeshard/schemeshard__pq_stats.cpp | 1 | ||||
-rw-r--r-- | ydb/core/tx/schemeshard/schemeshard_impl.cpp | 4 | ||||
-rw-r--r-- | ydb/core/tx/schemeshard/schemeshard_impl.h | 1 | ||||
-rw-r--r-- | ydb/core/tx/schemeshard/schemeshard_info_types.h | 1 |
16 files changed, 109 insertions, 20 deletions
diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp index 53e01a7a22..842b9bde60 100644 --- a/ydb/core/persqueue/partition.cpp +++ b/ydb/core/persqueue/partition.cpp @@ -181,9 +181,23 @@ ui64 TPartition::MeteringDataSize(const TActorContext& ctx) const { return size; } +ui64 TPartition::ReserveSize() const { + return TopicPartitionReserveSize(Config); +} + +ui64 TPartition::StorageSize(const TActorContext& ctx) const { + return std::max<ui64>(MeteringDataSize(ctx), ReserveSize()); +} + +ui64 TPartition::UsedReserveSize(const TActorContext& ctx) const { + return std::min<ui64>(MeteringDataSize(ctx), ReserveSize()); +} + + ui64 TPartition::GetUsedStorage(const TActorContext& ctx) { - auto duration = ctx.Now() - LastUsedStorageMeterTimestamp; - LastUsedStorageMeterTimestamp = ctx.Now(); + const auto now = ctx.Now(); + const auto duration = now - LastUsedStorageMeterTimestamp; + LastUsedStorageMeterTimestamp = now; ui64 size = MeteringDataSize(ctx); return size * duration.MilliSeconds() / 1000 / 1_MB; // mb*seconds } @@ -203,7 +217,7 @@ void TPartition::HandleWakeup(const TActorContext& ctx) { ProcessHasDataRequests(ctx); - auto now = ctx.Now(); + const auto now = ctx.Now(); for (auto& userInfo : UsersInfoStorage->GetAll()) { userInfo.second.UpdateReadingTimeAndState(now); for (auto& avg : userInfo.second.AvgReadBytes) { @@ -237,8 +251,8 @@ void TPartition::HandleWakeup(const TActorContext& ctx) { } if (haveChanges) { - WriteCycleStartTime = ctx.Now(); - WriteStartTime = ctx.Now(); + WriteCycleStartTime = now; + WriteStartTime = now; TopicQuotaWaitTimeForCurrentBlob = TDuration::Zero(); WritesTotal.Inc(); Become(&TThis::StateWrite); @@ -1178,11 +1192,25 @@ bool TPartition::UpdateCounters(const TActorContext& ctx) { } } - ui64 partSize = Size(); - if (partSize != PartitionCountersLabeled->GetCounters()[METRIC_TOTAL_PART_SIZE].Get()) { + ui64 storageSize = StorageSize(ctx); + if (storageSize != PartitionCountersLabeled->GetCounters()[METRIC_TOTAL_PART_SIZE].Get()) { haveChanges = true; - PartitionCountersLabeled->GetCounters()[METRIC_MAX_PART_SIZE].Set(partSize); - PartitionCountersLabeled->GetCounters()[METRIC_TOTAL_PART_SIZE].Set(partSize); + PartitionCountersLabeled->GetCounters()[METRIC_MAX_PART_SIZE].Set(storageSize); + PartitionCountersLabeled->GetCounters()[METRIC_TOTAL_PART_SIZE].Set(storageSize); + } + + if (NKikimrPQ::TPQTabletConfig::METERING_MODE_RESERVED_CAPACITY == Config.GetMeteringMode()) { + ui64 reserveSize = ReserveSize(); + if (reserveSize != PartitionCountersLabeled->GetCounters()[METRIC_RESERVE_LIMIT_BYTES].Get()) { + haveChanges = true; + PartitionCountersLabeled->GetCounters()[METRIC_RESERVE_LIMIT_BYTES].Set(reserveSize); + } + + ui64 reserveUsed = UsedReserveSize(ctx); + if (reserveUsed != PartitionCountersLabeled->GetCounters()[METRIC_RESERVE_USED_BYTES].Get()) { + haveChanges = true; + PartitionCountersLabeled->GetCounters()[METRIC_RESERVE_USED_BYTES].Set(reserveUsed); + } } ui64 ts = (WriteTimestamp.MilliSeconds() < MIN_TIMESTAMP_MS) ? Max<i64>() : WriteTimestamp.MilliSeconds(); diff --git a/ydb/core/persqueue/partition.h b/ydb/core/persqueue/partition.h index bedc9c2c8e..544ef7d6f0 100644 --- a/ydb/core/persqueue/partition.h +++ b/ydb/core/persqueue/partition.h @@ -338,14 +338,9 @@ public: } ui64 MeteringDataSize(const TActorContext& ctx) const; - - ui64 UsedReserveSize(const TActorContext& ctx) const { - return std::min<ui64>(MeteringDataSize(ctx), ReserveSize()); - } - - ui64 ReserveSize() const { - return TopicPartitionReserveSize(Config); - } + ui64 ReserveSize() const; + ui64 StorageSize(const TActorContext& ctx) const; + ui64 UsedReserveSize(const TActorContext& ctx) const; //Bootstrap sends kvRead diff --git a/ydb/core/persqueue/ut/common/pq_ut_common.cpp b/ydb/core/persqueue/ut/common/pq_ut_common.cpp index c658c01b69..e3e2e27da0 100644 --- a/ydb/core/persqueue/ut/common/pq_ut_common.cpp +++ b/ydb/core/persqueue/ut/common/pq_ut_common.cpp @@ -67,6 +67,7 @@ void PQTabletPrepare(const TTabletPreparationParameters& parameters, tabletConfig->SetLocalDC(parameters.localDC); tabletConfig->AddReadRules("user"); tabletConfig->AddReadFromTimestampsMs(parameters.readFromTimestampsMs); + tabletConfig->SetMeteringMode(parameters.meteringMode); auto config = tabletConfig->MutablePartitionConfig(); if (parameters.speed > 0) { config->SetWriteSpeedInBytesPerSecond(parameters.speed); diff --git a/ydb/core/persqueue/ut/common/pq_ut_common.h b/ydb/core/persqueue/ut/common/pq_ut_common.h index 8dc97c7812..003c89642a 100644 --- a/ydb/core/persqueue/ut/common/pq_ut_common.h +++ b/ydb/core/persqueue/ut/common/pq_ut_common.h @@ -260,6 +260,7 @@ struct TTabletPreparationParameters { TString databaseId{"PQ"}; TString databasePath{"/Root/PQ"}; TString account{"federationAccount"}; + ::NKikimrPQ::TPQTabletConfig_EMeteringMode meteringMode = NKikimrPQ::TPQTabletConfig::METERING_MODE_RESERVED_CAPACITY; }; void PQTabletPrepare( const TTabletPreparationParameters& parameters, diff --git a/ydb/core/persqueue/ut/counters_ut.cpp b/ydb/core/persqueue/ut/counters_ut.cpp index c3c06893ec..09e5beb09b 100644 --- a/ydb/core/persqueue/ut/counters_ut.cpp +++ b/ydb/core/persqueue/ut/counters_ut.cpp @@ -149,6 +149,9 @@ void CompareJsons(const TString& inputStr, const TString& referenceStr) { NJson::TJsonValue inputJson; UNIT_ASSERT(NJson::ReadJsonTree(TStringBuf(inputStr), &inputJson)); + Cerr << "Expected: " << referenceStr << Endl; + Cerr << "Result: " << inputStr << Endl; + // Run time of test differs as well as counters below. // We set it to 5000 and then compare with reference string. auto getByPath = [](const NJson::TJsonValue& msg, TStringBuf path) { @@ -261,7 +264,7 @@ Y_UNIT_TEST(PartitionFirstClass) { return TTestActorRuntime::DefaultObserverFunc(runtime, event); }); - PQTabletPrepare({}, {{"client", true}}, tc); + PQTabletPrepare({.deleteTime=3600, .meteringMode = NKikimrPQ::TPQTabletConfig::METERING_MODE_REQUEST_UNITS}, {{"client", true}}, tc); TFakeSchemeShardState::TPtr state{new TFakeSchemeShardState()}; ui64 ssId = 325; BootFakeSchemeShard(*tc.Runtime, ssId, state); diff --git a/ydb/core/persqueue/ut/resources/counters_labeled.json b/ydb/core/persqueue/ut/resources/counters_labeled.json index 28fd813977..85804aa58b 100644 --- a/ydb/core/persqueue/ut/resources/counters_labeled.json +++ b/ydb/core/persqueue/ut/resources/counters_labeled.json @@ -1817,6 +1817,24 @@ "labels": { "user_counters": "PersQueue", "topic": "rt3.dc1--asdfgs--topic", + "sensor": "PQ/ReserveLimitBytes" + }, + "value": 0 + }, + { + "kind": "GAUGE", + "labels": { + "user_counters": "PersQueue", + "topic": "rt3.dc1--asdfgs--topic", + "sensor": "PQ/ReserveUsedBytes" + }, + "value": 0 + }, + { + "kind": "GAUGE", + "labels": { + "user_counters": "PersQueue", + "topic": "rt3.dc1--asdfgs--topic", "sensor": "PQ/SourceIdCount" }, "value": 3 @@ -2123,6 +2141,24 @@ "labels": { "user_counters": "PersQueue", "topic": "total", + "sensor": "PQ/ReserveLimitBytes" + }, + "value": 0 + }, + { + "kind": "GAUGE", + "labels": { + "user_counters": "PersQueue", + "topic": "total", + "sensor": "PQ/ReserveUsedBytes" + }, + "value": 0 + }, + { + "kind": "GAUGE", + "labels": { + "user_counters": "PersQueue", + "topic": "total", "sensor": "PQ/SourceIdCount" }, "value": 3 diff --git a/ydb/core/persqueue/ut/resources/counters_topics.html b/ydb/core/persqueue/ut/resources/counters_topics.html index 902fcca125..1bf45fdec3 100644 --- a/ydb/core/persqueue/ut/resources/counters_topics.html +++ b/ydb/core/persqueue/ut/resources/counters_topics.html @@ -24,6 +24,8 @@ host=: name=topic.partition.write.speed_limit_bytes_per_second: 50000000 name=topic.partition.write.throttled_nanoseconds_max: 0 name=topic.producers_count: 3 + name=topic.reserve.limit_bytes: 0 + name=topic.reserve.used_bytes: 0 name=topic.storage_bytes: 747 consumer=client: diff --git a/ydb/core/protos/counters_pq.proto b/ydb/core/protos/counters_pq.proto index 981e4820bd..466de907f8 100644 --- a/ydb/core/protos/counters_pq.proto +++ b/ydb/core/protos/counters_pq.proto @@ -228,4 +228,7 @@ enum EPartitionLabeledCounters { METRIC_MIN_SID_LIFETIME = 33 [(LabeledCounterOpts) = {Name: "SourceIdMinLifetimeMs" AggrFunc : EAF_MIN SVName: ""}]; METRIC_PARTITIONS_TOTAL = 34 [(LabeledCounterOpts) = {Name: "PartitionsTotal" AggrFunc : EAF_MAX SVName: "topic.partition.total_count"}]; + + METRIC_RESERVE_LIMIT_BYTES = 35 [(LabeledCounterOpts) = {Name: "ReserveLimitBytes" AggrFunc : EAF_SUM SVName: "topic.reserve.limit_bytes"}]; + METRIC_RESERVE_USED_BYTES = 36 [(LabeledCounterOpts) = {Name: "ReserveUsedBytes" AggrFunc : EAF_SUM SVName: "topic.reserve.used_bytes"}]; } diff --git a/ydb/core/protos/counters_schemeshard.proto b/ydb/core/protos/counters_schemeshard.proto index 5b6843dfda..73cf72a9cf 100644 --- a/ydb/core/protos/counters_schemeshard.proto +++ b/ydb/core/protos/counters_schemeshard.proto @@ -185,6 +185,7 @@ enum ESimpleCounters { COUNTER_IN_FLIGHT_OPS_TxAlterExternalDataSource = 150 [(CounterOpts) = {Name: "InFlightOps/AlterExternalDataSource"}]; COUNTER_PQ_STATS_QUEUE_SIZE = 151 [(CounterOpts) = {Name: "PQStatsQueueSize"}]; + COUNTER_DISK_SPACE_TOPICS_TOTAL_BYTES = 152 [(CounterOpts) = {Name: "DiskSpaceTopicsTotalBytes"}]; } enum ECumulativeCounters { diff --git a/ydb/core/tablet/tablet_counters_aggregator.cpp b/ydb/core/tablet/tablet_counters_aggregator.cpp index 3f6e6bdd4b..f677e799af 100644 --- a/ydb/core/tablet/tablet_counters_aggregator.cpp +++ b/ydb/core/tablet/tablet_counters_aggregator.cpp @@ -760,6 +760,8 @@ private: TCounterPtr DatashardSizeBytes; TCounterPtr ResourcesStorageUsedBytes; TCounterPtr ResourcesStorageLimitBytes; + TCounterPtr ResourcesStorageTableUsedBytes; + TCounterPtr ResourcesStorageTopicUsedBytes; TCounterPtr ResourcesStreamUsedShards; TCounterPtr ResourcesStreamLimitShards; //TCounterPtr ResourcesStreamUsedShardsPercents; @@ -786,6 +788,7 @@ private: THistogramPtr ConsumedCpuHistogram; TCounterPtr DiskSpaceTablesTotalBytes; + TCounterPtr DiskSpaceTopicsTotalBytes; TCounterPtr DiskSpaceSoftQuotaBytes; TCounterPtr StreamShardsCount; @@ -827,6 +830,10 @@ private: "resources.storage.used_bytes", false); ResourcesStorageLimitBytes = ydbGroup->GetNamedCounter("name", "resources.storage.limit_bytes", false); + ResourcesStorageTableUsedBytes = ydbGroup->GetNamedCounter("name", + "resources.storage.table.used_bytes", false); + ResourcesStorageTopicUsedBytes = ydbGroup->GetNamedCounter("name", + "resources.storage.topic.used_bytes", false); ResourcesStreamUsedShards = ydbGroup->GetNamedCounter("name", "resources.stream.used_shards", false); @@ -879,6 +886,7 @@ private: auto appGroup = schemeshardGroup->GetSubgroup("category", "app"); DiskSpaceTablesTotalBytes = appGroup->GetCounter("SUM(SchemeShard/DiskSpaceTablesTotalBytes)"); + DiskSpaceTopicsTotalBytes = appGroup->GetCounter("SUM(SchemeShard/DiskSpaceTopicsTotalBytes)"); DiskSpaceSoftQuotaBytes = appGroup->GetCounter("SUM(SchemeShard/DiskSpaceSoftQuotaBytes)"); StreamShardsCount = appGroup->GetCounter("SUM(SchemeShard/StreamShardsCount)"); @@ -886,7 +894,6 @@ private: StreamReservedThroughput = appGroup->GetCounter("SUM(SchemeShard/StreamReservedThroughput)"); StreamReservedStorage = appGroup->GetCounter("SUM(SchemeShard/StreamReservedStorage)"); StreamReservedStorageLimit = appGroup->GetCounter("SUM(SchemeShard/StreamReservedStorageQuota)"); - } } @@ -911,8 +918,11 @@ private: } if (DiskSpaceTablesTotalBytes) { - ResourcesStorageUsedBytes->Set(DiskSpaceTablesTotalBytes->Val()); ResourcesStorageLimitBytes->Set(DiskSpaceSoftQuotaBytes->Val()); + ResourcesStorageTableUsedBytes->Set(DiskSpaceTablesTotalBytes->Val()); + ResourcesStorageTopicUsedBytes->Set(DiskSpaceTopicsTotalBytes->Val()); + + ResourcesStorageUsedBytes->Set(ResourcesStorageTableUsedBytes->Val() + ResourcesStorageTopicUsedBytes->Val()); auto quota = StreamShardsQuota->Val(); ResourcesStreamUsedShards->Set(StreamShardsCount->Val()); diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_deallocate_pq.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_deallocate_pq.cpp index 5284f79137..8c5df04fb5 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_deallocate_pq.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_deallocate_pq.cpp @@ -119,6 +119,7 @@ public: domainInfo->DecPQReservedStorage(reserve.Storage); domainInfo->AggrDiskSpaceUsage({}, pqGroup->Stats); + context.SS->ChangeDiskSpaceTopicsTotalBytes(domainInfo->GetPQAccountStorage()); context.SS->TabletCounters->Simple()[COUNTER_STREAM_RESERVED_THROUGHPUT].Sub(reserve.Throughput); context.SS->TabletCounters->Simple()[COUNTER_STREAM_RESERVED_STORAGE].Sub(reserve.Storage); diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_drop_pq.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_drop_pq.cpp index 7a3c092762..f31c164d75 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_drop_pq.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_drop_pq.cpp @@ -196,6 +196,7 @@ public: context.OnComplete.PublishToSchemeBoard(OperationId, subDomainId); } + context.SS->ChangeDiskSpaceTopicsTotalBytes(domainInfo->GetPQAccountStorage()); context.SS->TabletCounters->Simple()[COUNTER_STREAM_RESERVED_THROUGHPUT].Sub(reserve.Throughput); context.SS->TabletCounters->Simple()[COUNTER_STREAM_RESERVED_STORAGE].Sub(reserve.Storage); diff --git a/ydb/core/tx/schemeshard/schemeshard__pq_stats.cpp b/ydb/core/tx/schemeshard/schemeshard__pq_stats.cpp index 93b2f4cd8e..9e5bd3baae 100644 --- a/ydb/core/tx/schemeshard/schemeshard__pq_stats.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__pq_stats.cpp @@ -65,6 +65,7 @@ bool TTxStoreTopicStats::PersistSingleStats(const TPathId& pathId, const TStatsQ NIceDb::TNiceDb db(txc.DB); Self->PersistPersQueueGroupStats(db, pathId, newStats); + Self->ChangeDiskSpaceTopicsTotalBytes(subDomainInfo->GetPQAccountStorage()); if (subDomainInfo->CheckDiskSpaceQuotas(Self)) { auto subDomainId = Self->ResolvePathIdForDomain(pathId); diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.cpp b/ydb/core/tx/schemeshard/schemeshard_impl.cpp index b2bc43c6e0..e5f35481e5 100644 --- a/ydb/core/tx/schemeshard/schemeshard_impl.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_impl.cpp @@ -6690,6 +6690,10 @@ void TSchemeShard::ChangeDiskSpaceTablesTotalBytes(i64 delta) { TabletCounters->Simple()[COUNTER_DISK_SPACE_TABLES_TOTAL_BYTES].Add(delta); } +void TSchemeShard::ChangeDiskSpaceTopicsTotalBytes(ui64 value) { + TabletCounters->Simple()[COUNTER_DISK_SPACE_TOPICS_TOTAL_BYTES].Set(value); +} + void TSchemeShard::ChangeDiskSpaceQuotaExceeded(i64 delta) { TabletCounters->Simple()[COUNTER_DISK_SPACE_QUOTA_EXCEEDED].Add(delta); } diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.h b/ydb/core/tx/schemeshard/schemeshard_impl.h index a96d221808..538033b9b0 100644 --- a/ydb/core/tx/schemeshard/schemeshard_impl.h +++ b/ydb/core/tx/schemeshard/schemeshard_impl.h @@ -1248,6 +1248,7 @@ public: void ChangeDiskSpaceTablesDataBytes(i64 delta) override; void ChangeDiskSpaceTablesIndexBytes(i64 delta) override; void ChangeDiskSpaceTablesTotalBytes(i64 delta) override; + void ChangeDiskSpaceTopicsTotalBytes(ui64 value) override; void ChangeDiskSpaceQuotaExceeded(i64 delta) override; void ChangeDiskSpaceHardQuotaBytes(i64 delta) override; void ChangeDiskSpaceSoftQuotaBytes(i64 delta) override; diff --git a/ydb/core/tx/schemeshard/schemeshard_info_types.h b/ydb/core/tx/schemeshard/schemeshard_info_types.h index df26fad27f..bd9ade2fad 100644 --- a/ydb/core/tx/schemeshard/schemeshard_info_types.h +++ b/ydb/core/tx/schemeshard/schemeshard_info_types.h @@ -1410,6 +1410,7 @@ struct IQuotaCounters { virtual void ChangeDiskSpaceTablesDataBytes(i64 delta) = 0; virtual void ChangeDiskSpaceTablesIndexBytes(i64 delta) = 0; virtual void ChangeDiskSpaceTablesTotalBytes(i64 delta) = 0; + virtual void ChangeDiskSpaceTopicsTotalBytes(ui64 value) = 0; virtual void ChangeDiskSpaceQuotaExceeded(i64 delta) = 0; virtual void ChangeDiskSpaceHardQuotaBytes(i64 delta) = 0; virtual void ChangeDiskSpaceSoftQuotaBytes(i64 delta) = 0; |