aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authortesseract <tesseract@yandex-team.com>2023-05-24 15:37:15 +0300
committertesseract <tesseract@yandex-team.com>2023-05-24 15:37:15 +0300
commit938940585bd002f480d1a8c29cb707085c2af3a5 (patch)
tree58cabe420216daf762ea227701fa416034e808e7
parente067ade1b37cae5f22ca9d91c3e67e2511b7185d (diff)
downloadydb-938940585bd002f480d1a8c29cb707085c2af3a5.tar.gz
Topic storage size metrics #2
-rw-r--r--ydb/core/persqueue/partition.cpp46
-rw-r--r--ydb/core/persqueue/partition.h11
-rw-r--r--ydb/core/persqueue/ut/common/pq_ut_common.cpp1
-rw-r--r--ydb/core/persqueue/ut/common/pq_ut_common.h1
-rw-r--r--ydb/core/persqueue/ut/counters_ut.cpp5
-rw-r--r--ydb/core/persqueue/ut/resources/counters_labeled.json36
-rw-r--r--ydb/core/persqueue/ut/resources/counters_topics.html2
-rw-r--r--ydb/core/protos/counters_pq.proto3
-rw-r--r--ydb/core/protos/counters_schemeshard.proto1
-rw-r--r--ydb/core/tablet/tablet_counters_aggregator.cpp14
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_deallocate_pq.cpp1
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_drop_pq.cpp1
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__pq_stats.cpp1
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_impl.cpp4
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_impl.h1
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_info_types.h1
16 files changed, 109 insertions, 20 deletions
diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp
index 53e01a7a22..842b9bde60 100644
--- a/ydb/core/persqueue/partition.cpp
+++ b/ydb/core/persqueue/partition.cpp
@@ -181,9 +181,23 @@ ui64 TPartition::MeteringDataSize(const TActorContext& ctx) const {
return size;
}
+ui64 TPartition::ReserveSize() const {
+ return TopicPartitionReserveSize(Config);
+}
+
+ui64 TPartition::StorageSize(const TActorContext& ctx) const {
+ return std::max<ui64>(MeteringDataSize(ctx), ReserveSize());
+}
+
+ui64 TPartition::UsedReserveSize(const TActorContext& ctx) const {
+ return std::min<ui64>(MeteringDataSize(ctx), ReserveSize());
+}
+
+
ui64 TPartition::GetUsedStorage(const TActorContext& ctx) {
- auto duration = ctx.Now() - LastUsedStorageMeterTimestamp;
- LastUsedStorageMeterTimestamp = ctx.Now();
+ const auto now = ctx.Now();
+ const auto duration = now - LastUsedStorageMeterTimestamp;
+ LastUsedStorageMeterTimestamp = now;
ui64 size = MeteringDataSize(ctx);
return size * duration.MilliSeconds() / 1000 / 1_MB; // mb*seconds
}
@@ -203,7 +217,7 @@ void TPartition::HandleWakeup(const TActorContext& ctx) {
ProcessHasDataRequests(ctx);
- auto now = ctx.Now();
+ const auto now = ctx.Now();
for (auto& userInfo : UsersInfoStorage->GetAll()) {
userInfo.second.UpdateReadingTimeAndState(now);
for (auto& avg : userInfo.second.AvgReadBytes) {
@@ -237,8 +251,8 @@ void TPartition::HandleWakeup(const TActorContext& ctx) {
}
if (haveChanges) {
- WriteCycleStartTime = ctx.Now();
- WriteStartTime = ctx.Now();
+ WriteCycleStartTime = now;
+ WriteStartTime = now;
TopicQuotaWaitTimeForCurrentBlob = TDuration::Zero();
WritesTotal.Inc();
Become(&TThis::StateWrite);
@@ -1178,11 +1192,25 @@ bool TPartition::UpdateCounters(const TActorContext& ctx) {
}
}
- ui64 partSize = Size();
- if (partSize != PartitionCountersLabeled->GetCounters()[METRIC_TOTAL_PART_SIZE].Get()) {
+ ui64 storageSize = StorageSize(ctx);
+ if (storageSize != PartitionCountersLabeled->GetCounters()[METRIC_TOTAL_PART_SIZE].Get()) {
haveChanges = true;
- PartitionCountersLabeled->GetCounters()[METRIC_MAX_PART_SIZE].Set(partSize);
- PartitionCountersLabeled->GetCounters()[METRIC_TOTAL_PART_SIZE].Set(partSize);
+ PartitionCountersLabeled->GetCounters()[METRIC_MAX_PART_SIZE].Set(storageSize);
+ PartitionCountersLabeled->GetCounters()[METRIC_TOTAL_PART_SIZE].Set(storageSize);
+ }
+
+ if (NKikimrPQ::TPQTabletConfig::METERING_MODE_RESERVED_CAPACITY == Config.GetMeteringMode()) {
+ ui64 reserveSize = ReserveSize();
+ if (reserveSize != PartitionCountersLabeled->GetCounters()[METRIC_RESERVE_LIMIT_BYTES].Get()) {
+ haveChanges = true;
+ PartitionCountersLabeled->GetCounters()[METRIC_RESERVE_LIMIT_BYTES].Set(reserveSize);
+ }
+
+ ui64 reserveUsed = UsedReserveSize(ctx);
+ if (reserveUsed != PartitionCountersLabeled->GetCounters()[METRIC_RESERVE_USED_BYTES].Get()) {
+ haveChanges = true;
+ PartitionCountersLabeled->GetCounters()[METRIC_RESERVE_USED_BYTES].Set(reserveUsed);
+ }
}
ui64 ts = (WriteTimestamp.MilliSeconds() < MIN_TIMESTAMP_MS) ? Max<i64>() : WriteTimestamp.MilliSeconds();
diff --git a/ydb/core/persqueue/partition.h b/ydb/core/persqueue/partition.h
index bedc9c2c8e..544ef7d6f0 100644
--- a/ydb/core/persqueue/partition.h
+++ b/ydb/core/persqueue/partition.h
@@ -338,14 +338,9 @@ public:
}
ui64 MeteringDataSize(const TActorContext& ctx) const;
-
- ui64 UsedReserveSize(const TActorContext& ctx) const {
- return std::min<ui64>(MeteringDataSize(ctx), ReserveSize());
- }
-
- ui64 ReserveSize() const {
- return TopicPartitionReserveSize(Config);
- }
+ ui64 ReserveSize() const;
+ ui64 StorageSize(const TActorContext& ctx) const;
+ ui64 UsedReserveSize(const TActorContext& ctx) const;
//Bootstrap sends kvRead
diff --git a/ydb/core/persqueue/ut/common/pq_ut_common.cpp b/ydb/core/persqueue/ut/common/pq_ut_common.cpp
index c658c01b69..e3e2e27da0 100644
--- a/ydb/core/persqueue/ut/common/pq_ut_common.cpp
+++ b/ydb/core/persqueue/ut/common/pq_ut_common.cpp
@@ -67,6 +67,7 @@ void PQTabletPrepare(const TTabletPreparationParameters& parameters,
tabletConfig->SetLocalDC(parameters.localDC);
tabletConfig->AddReadRules("user");
tabletConfig->AddReadFromTimestampsMs(parameters.readFromTimestampsMs);
+ tabletConfig->SetMeteringMode(parameters.meteringMode);
auto config = tabletConfig->MutablePartitionConfig();
if (parameters.speed > 0) {
config->SetWriteSpeedInBytesPerSecond(parameters.speed);
diff --git a/ydb/core/persqueue/ut/common/pq_ut_common.h b/ydb/core/persqueue/ut/common/pq_ut_common.h
index 8dc97c7812..003c89642a 100644
--- a/ydb/core/persqueue/ut/common/pq_ut_common.h
+++ b/ydb/core/persqueue/ut/common/pq_ut_common.h
@@ -260,6 +260,7 @@ struct TTabletPreparationParameters {
TString databaseId{"PQ"};
TString databasePath{"/Root/PQ"};
TString account{"federationAccount"};
+ ::NKikimrPQ::TPQTabletConfig_EMeteringMode meteringMode = NKikimrPQ::TPQTabletConfig::METERING_MODE_RESERVED_CAPACITY;
};
void PQTabletPrepare(
const TTabletPreparationParameters& parameters,
diff --git a/ydb/core/persqueue/ut/counters_ut.cpp b/ydb/core/persqueue/ut/counters_ut.cpp
index c3c06893ec..09e5beb09b 100644
--- a/ydb/core/persqueue/ut/counters_ut.cpp
+++ b/ydb/core/persqueue/ut/counters_ut.cpp
@@ -149,6 +149,9 @@ void CompareJsons(const TString& inputStr, const TString& referenceStr) {
NJson::TJsonValue inputJson;
UNIT_ASSERT(NJson::ReadJsonTree(TStringBuf(inputStr), &inputJson));
+ Cerr << "Expected: " << referenceStr << Endl;
+ Cerr << "Result: " << inputStr << Endl;
+
// Run time of test differs as well as counters below.
// We set it to 5000 and then compare with reference string.
auto getByPath = [](const NJson::TJsonValue& msg, TStringBuf path) {
@@ -261,7 +264,7 @@ Y_UNIT_TEST(PartitionFirstClass) {
return TTestActorRuntime::DefaultObserverFunc(runtime, event);
});
- PQTabletPrepare({}, {{"client", true}}, tc);
+ PQTabletPrepare({.deleteTime=3600, .meteringMode = NKikimrPQ::TPQTabletConfig::METERING_MODE_REQUEST_UNITS}, {{"client", true}}, tc);
TFakeSchemeShardState::TPtr state{new TFakeSchemeShardState()};
ui64 ssId = 325;
BootFakeSchemeShard(*tc.Runtime, ssId, state);
diff --git a/ydb/core/persqueue/ut/resources/counters_labeled.json b/ydb/core/persqueue/ut/resources/counters_labeled.json
index 28fd813977..85804aa58b 100644
--- a/ydb/core/persqueue/ut/resources/counters_labeled.json
+++ b/ydb/core/persqueue/ut/resources/counters_labeled.json
@@ -1817,6 +1817,24 @@
"labels": {
"user_counters": "PersQueue",
"topic": "rt3.dc1--asdfgs--topic",
+ "sensor": "PQ/ReserveLimitBytes"
+ },
+ "value": 0
+ },
+ {
+ "kind": "GAUGE",
+ "labels": {
+ "user_counters": "PersQueue",
+ "topic": "rt3.dc1--asdfgs--topic",
+ "sensor": "PQ/ReserveUsedBytes"
+ },
+ "value": 0
+ },
+ {
+ "kind": "GAUGE",
+ "labels": {
+ "user_counters": "PersQueue",
+ "topic": "rt3.dc1--asdfgs--topic",
"sensor": "PQ/SourceIdCount"
},
"value": 3
@@ -2123,6 +2141,24 @@
"labels": {
"user_counters": "PersQueue",
"topic": "total",
+ "sensor": "PQ/ReserveLimitBytes"
+ },
+ "value": 0
+ },
+ {
+ "kind": "GAUGE",
+ "labels": {
+ "user_counters": "PersQueue",
+ "topic": "total",
+ "sensor": "PQ/ReserveUsedBytes"
+ },
+ "value": 0
+ },
+ {
+ "kind": "GAUGE",
+ "labels": {
+ "user_counters": "PersQueue",
+ "topic": "total",
"sensor": "PQ/SourceIdCount"
},
"value": 3
diff --git a/ydb/core/persqueue/ut/resources/counters_topics.html b/ydb/core/persqueue/ut/resources/counters_topics.html
index 902fcca125..1bf45fdec3 100644
--- a/ydb/core/persqueue/ut/resources/counters_topics.html
+++ b/ydb/core/persqueue/ut/resources/counters_topics.html
@@ -24,6 +24,8 @@ host=:
name=topic.partition.write.speed_limit_bytes_per_second: 50000000
name=topic.partition.write.throttled_nanoseconds_max: 0
name=topic.producers_count: 3
+ name=topic.reserve.limit_bytes: 0
+ name=topic.reserve.used_bytes: 0
name=topic.storage_bytes: 747
consumer=client:
diff --git a/ydb/core/protos/counters_pq.proto b/ydb/core/protos/counters_pq.proto
index 981e4820bd..466de907f8 100644
--- a/ydb/core/protos/counters_pq.proto
+++ b/ydb/core/protos/counters_pq.proto
@@ -228,4 +228,7 @@ enum EPartitionLabeledCounters {
METRIC_MIN_SID_LIFETIME = 33 [(LabeledCounterOpts) = {Name: "SourceIdMinLifetimeMs" AggrFunc : EAF_MIN SVName: ""}];
METRIC_PARTITIONS_TOTAL = 34 [(LabeledCounterOpts) = {Name: "PartitionsTotal" AggrFunc : EAF_MAX SVName: "topic.partition.total_count"}];
+
+ METRIC_RESERVE_LIMIT_BYTES = 35 [(LabeledCounterOpts) = {Name: "ReserveLimitBytes" AggrFunc : EAF_SUM SVName: "topic.reserve.limit_bytes"}];
+ METRIC_RESERVE_USED_BYTES = 36 [(LabeledCounterOpts) = {Name: "ReserveUsedBytes" AggrFunc : EAF_SUM SVName: "topic.reserve.used_bytes"}];
}
diff --git a/ydb/core/protos/counters_schemeshard.proto b/ydb/core/protos/counters_schemeshard.proto
index 5b6843dfda..73cf72a9cf 100644
--- a/ydb/core/protos/counters_schemeshard.proto
+++ b/ydb/core/protos/counters_schemeshard.proto
@@ -185,6 +185,7 @@ enum ESimpleCounters {
COUNTER_IN_FLIGHT_OPS_TxAlterExternalDataSource = 150 [(CounterOpts) = {Name: "InFlightOps/AlterExternalDataSource"}];
COUNTER_PQ_STATS_QUEUE_SIZE = 151 [(CounterOpts) = {Name: "PQStatsQueueSize"}];
+ COUNTER_DISK_SPACE_TOPICS_TOTAL_BYTES = 152 [(CounterOpts) = {Name: "DiskSpaceTopicsTotalBytes"}];
}
enum ECumulativeCounters {
diff --git a/ydb/core/tablet/tablet_counters_aggregator.cpp b/ydb/core/tablet/tablet_counters_aggregator.cpp
index 3f6e6bdd4b..f677e799af 100644
--- a/ydb/core/tablet/tablet_counters_aggregator.cpp
+++ b/ydb/core/tablet/tablet_counters_aggregator.cpp
@@ -760,6 +760,8 @@ private:
TCounterPtr DatashardSizeBytes;
TCounterPtr ResourcesStorageUsedBytes;
TCounterPtr ResourcesStorageLimitBytes;
+ TCounterPtr ResourcesStorageTableUsedBytes;
+ TCounterPtr ResourcesStorageTopicUsedBytes;
TCounterPtr ResourcesStreamUsedShards;
TCounterPtr ResourcesStreamLimitShards;
//TCounterPtr ResourcesStreamUsedShardsPercents;
@@ -786,6 +788,7 @@ private:
THistogramPtr ConsumedCpuHistogram;
TCounterPtr DiskSpaceTablesTotalBytes;
+ TCounterPtr DiskSpaceTopicsTotalBytes;
TCounterPtr DiskSpaceSoftQuotaBytes;
TCounterPtr StreamShardsCount;
@@ -827,6 +830,10 @@ private:
"resources.storage.used_bytes", false);
ResourcesStorageLimitBytes = ydbGroup->GetNamedCounter("name",
"resources.storage.limit_bytes", false);
+ ResourcesStorageTableUsedBytes = ydbGroup->GetNamedCounter("name",
+ "resources.storage.table.used_bytes", false);
+ ResourcesStorageTopicUsedBytes = ydbGroup->GetNamedCounter("name",
+ "resources.storage.topic.used_bytes", false);
ResourcesStreamUsedShards = ydbGroup->GetNamedCounter("name",
"resources.stream.used_shards", false);
@@ -879,6 +886,7 @@ private:
auto appGroup = schemeshardGroup->GetSubgroup("category", "app");
DiskSpaceTablesTotalBytes = appGroup->GetCounter("SUM(SchemeShard/DiskSpaceTablesTotalBytes)");
+ DiskSpaceTopicsTotalBytes = appGroup->GetCounter("SUM(SchemeShard/DiskSpaceTopicsTotalBytes)");
DiskSpaceSoftQuotaBytes = appGroup->GetCounter("SUM(SchemeShard/DiskSpaceSoftQuotaBytes)");
StreamShardsCount = appGroup->GetCounter("SUM(SchemeShard/StreamShardsCount)");
@@ -886,7 +894,6 @@ private:
StreamReservedThroughput = appGroup->GetCounter("SUM(SchemeShard/StreamReservedThroughput)");
StreamReservedStorage = appGroup->GetCounter("SUM(SchemeShard/StreamReservedStorage)");
StreamReservedStorageLimit = appGroup->GetCounter("SUM(SchemeShard/StreamReservedStorageQuota)");
-
}
}
@@ -911,8 +918,11 @@ private:
}
if (DiskSpaceTablesTotalBytes) {
- ResourcesStorageUsedBytes->Set(DiskSpaceTablesTotalBytes->Val());
ResourcesStorageLimitBytes->Set(DiskSpaceSoftQuotaBytes->Val());
+ ResourcesStorageTableUsedBytes->Set(DiskSpaceTablesTotalBytes->Val());
+ ResourcesStorageTopicUsedBytes->Set(DiskSpaceTopicsTotalBytes->Val());
+
+ ResourcesStorageUsedBytes->Set(ResourcesStorageTableUsedBytes->Val() + ResourcesStorageTopicUsedBytes->Val());
auto quota = StreamShardsQuota->Val();
ResourcesStreamUsedShards->Set(StreamShardsCount->Val());
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_deallocate_pq.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_deallocate_pq.cpp
index 5284f79137..8c5df04fb5 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_deallocate_pq.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_deallocate_pq.cpp
@@ -119,6 +119,7 @@ public:
domainInfo->DecPQReservedStorage(reserve.Storage);
domainInfo->AggrDiskSpaceUsage({}, pqGroup->Stats);
+ context.SS->ChangeDiskSpaceTopicsTotalBytes(domainInfo->GetPQAccountStorage());
context.SS->TabletCounters->Simple()[COUNTER_STREAM_RESERVED_THROUGHPUT].Sub(reserve.Throughput);
context.SS->TabletCounters->Simple()[COUNTER_STREAM_RESERVED_STORAGE].Sub(reserve.Storage);
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_drop_pq.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_drop_pq.cpp
index 7a3c092762..f31c164d75 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_drop_pq.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_drop_pq.cpp
@@ -196,6 +196,7 @@ public:
context.OnComplete.PublishToSchemeBoard(OperationId, subDomainId);
}
+ context.SS->ChangeDiskSpaceTopicsTotalBytes(domainInfo->GetPQAccountStorage());
context.SS->TabletCounters->Simple()[COUNTER_STREAM_RESERVED_THROUGHPUT].Sub(reserve.Throughput);
context.SS->TabletCounters->Simple()[COUNTER_STREAM_RESERVED_STORAGE].Sub(reserve.Storage);
diff --git a/ydb/core/tx/schemeshard/schemeshard__pq_stats.cpp b/ydb/core/tx/schemeshard/schemeshard__pq_stats.cpp
index 93b2f4cd8e..9e5bd3baae 100644
--- a/ydb/core/tx/schemeshard/schemeshard__pq_stats.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__pq_stats.cpp
@@ -65,6 +65,7 @@ bool TTxStoreTopicStats::PersistSingleStats(const TPathId& pathId, const TStatsQ
NIceDb::TNiceDb db(txc.DB);
Self->PersistPersQueueGroupStats(db, pathId, newStats);
+ Self->ChangeDiskSpaceTopicsTotalBytes(subDomainInfo->GetPQAccountStorage());
if (subDomainInfo->CheckDiskSpaceQuotas(Self)) {
auto subDomainId = Self->ResolvePathIdForDomain(pathId);
diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.cpp b/ydb/core/tx/schemeshard/schemeshard_impl.cpp
index b2bc43c6e0..e5f35481e5 100644
--- a/ydb/core/tx/schemeshard/schemeshard_impl.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard_impl.cpp
@@ -6690,6 +6690,10 @@ void TSchemeShard::ChangeDiskSpaceTablesTotalBytes(i64 delta) {
TabletCounters->Simple()[COUNTER_DISK_SPACE_TABLES_TOTAL_BYTES].Add(delta);
}
+void TSchemeShard::ChangeDiskSpaceTopicsTotalBytes(ui64 value) {
+ TabletCounters->Simple()[COUNTER_DISK_SPACE_TOPICS_TOTAL_BYTES].Set(value);
+}
+
void TSchemeShard::ChangeDiskSpaceQuotaExceeded(i64 delta) {
TabletCounters->Simple()[COUNTER_DISK_SPACE_QUOTA_EXCEEDED].Add(delta);
}
diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.h b/ydb/core/tx/schemeshard/schemeshard_impl.h
index a96d221808..538033b9b0 100644
--- a/ydb/core/tx/schemeshard/schemeshard_impl.h
+++ b/ydb/core/tx/schemeshard/schemeshard_impl.h
@@ -1248,6 +1248,7 @@ public:
void ChangeDiskSpaceTablesDataBytes(i64 delta) override;
void ChangeDiskSpaceTablesIndexBytes(i64 delta) override;
void ChangeDiskSpaceTablesTotalBytes(i64 delta) override;
+ void ChangeDiskSpaceTopicsTotalBytes(ui64 value) override;
void ChangeDiskSpaceQuotaExceeded(i64 delta) override;
void ChangeDiskSpaceHardQuotaBytes(i64 delta) override;
void ChangeDiskSpaceSoftQuotaBytes(i64 delta) override;
diff --git a/ydb/core/tx/schemeshard/schemeshard_info_types.h b/ydb/core/tx/schemeshard/schemeshard_info_types.h
index df26fad27f..bd9ade2fad 100644
--- a/ydb/core/tx/schemeshard/schemeshard_info_types.h
+++ b/ydb/core/tx/schemeshard/schemeshard_info_types.h
@@ -1410,6 +1410,7 @@ struct IQuotaCounters {
virtual void ChangeDiskSpaceTablesDataBytes(i64 delta) = 0;
virtual void ChangeDiskSpaceTablesIndexBytes(i64 delta) = 0;
virtual void ChangeDiskSpaceTablesTotalBytes(i64 delta) = 0;
+ virtual void ChangeDiskSpaceTopicsTotalBytes(ui64 value) = 0;
virtual void ChangeDiskSpaceQuotaExceeded(i64 delta) = 0;
virtual void ChangeDiskSpaceHardQuotaBytes(i64 delta) = 0;
virtual void ChangeDiskSpaceSoftQuotaBytes(i64 delta) = 0;