aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authormokhotskii <mokhotskii@yandex-team.ru>2022-06-15 13:44:02 +0300
committerDaniil Cherednik <dan.cherednik@gmail.com>2022-06-15 13:44:02 +0300
commitb03d6efa25ad31f6c8629c7a2f116f8c9ba8ef00 (patch)
tree095198bcc7bf4c43955787806f686cffca9dac9c
parent9d200e5bcf064005177db031e405e346a73d50c2 (diff)
downloadydb-b03d6efa25ad31f6c8629c7a2f116f8c9ba8ef00.tar.gz
[merge to 22-2] Add FlushLimit to TMeteringSink
Add FlushLimit to TMeteringSink It limits time period in one metering json. By default each json contains at most 1hour time period. REVIEW: 2580814 REVIEW: 2632151 x-ydb-stable-ref: f525de6bfdf3958a7bfd65061afdd7e29bdfd4d4
-rw-r--r--ydb/core/persqueue/metering_sink.cpp12
-rw-r--r--ydb/core/persqueue/metering_sink.h1
2 files changed, 7 insertions, 6 deletions
diff --git a/ydb/core/persqueue/metering_sink.cpp b/ydb/core/persqueue/metering_sink.cpp
index 942650760d..0726ff94c7 100644
--- a/ydb/core/persqueue/metering_sink.cpp
+++ b/ydb/core/persqueue/metering_sink.cpp
@@ -151,7 +151,7 @@ void TMeteringSink::Flush(TInstant now, bool force) {
{"shard_enhanced_consumers_throughput", Parameters_.ConsumersThroughput},
{"reserved_storage_bytes", Parameters_.ReservedSpace}
};
- auto interval = TInstant::Hours(LastFlush_[whichOne].Hours() + 1);
+ auto interval = TInstant::Hours(LastFlush_[whichOne].Hours()) + Parameters_.FlushLimit;
while (interval < now) {
const auto metricsJson = GetMeteringJson(
name, schema, tags, "second",
@@ -159,7 +159,7 @@ void TMeteringSink::Flush(TInstant now, bool force) {
LastFlush_[whichOne], interval, now);
LastFlush_[whichOne] = interval;
FlushFunction_(metricsJson);
- interval += TDuration::Hours(1);
+ interval += Parameters_.FlushLimit;
}
if (LastFlush_[whichOne] < now) {
const auto metricsJson = GetMeteringJson(
@@ -182,7 +182,7 @@ void TMeteringSink::Flush(TInstant now, bool force) {
const THashMap<TString, ui64> tags = {
{"reserved_throughput_bps", Parameters_.WriteQuota},
};
- auto interval = TInstant::Hours(LastFlush_[whichOne].Hours() + 1);
+ auto interval = TInstant::Hours(LastFlush_[whichOne].Hours()) + Parameters_.FlushLimit;
while (interval < now) {
const auto metricsJson = GetMeteringJson(
name, schema, tags, "second",
@@ -190,7 +190,7 @@ void TMeteringSink::Flush(TInstant now, bool force) {
LastFlush_[whichOne], interval, now);
LastFlush_[whichOne] = interval;
FlushFunction_(metricsJson);
- interval += TDuration::Hours(1);
+ interval += Parameters_.FlushLimit;
}
if (LastFlush_[whichOne] < now) {
const auto metricsJson = GetMeteringJson(
@@ -210,7 +210,7 @@ void TMeteringSink::Flush(TInstant now, bool force) {
}
const TString name = "yds.reserved_resources";
const TString schema = "yds.storage.reserved.v1";
- auto interval = TInstant::Hours(LastFlush_[whichOne].Hours() + 1);
+ auto interval = TInstant::Hours(LastFlush_[whichOne].Hours()) + Parameters_.FlushLimit;
while (interval < now) {
const auto metricsJson = GetMeteringJson(
name, schema, {}, "mbyte*second",
@@ -219,7 +219,7 @@ void TMeteringSink::Flush(TInstant now, bool force) {
LastFlush_[whichOne], interval, now);
LastFlush_[whichOne] = interval;
FlushFunction_(metricsJson);
- interval += TDuration::Hours(1);
+ interval += Parameters_.FlushLimit;
}
if (LastFlush_[whichOne] < now) {
const auto metricsJson = GetMeteringJson(
diff --git a/ydb/core/persqueue/metering_sink.h b/ydb/core/persqueue/metering_sink.h
index 6fe0154811..d836a50e8e 100644
--- a/ydb/core/persqueue/metering_sink.h
+++ b/ydb/core/persqueue/metering_sink.h
@@ -15,6 +15,7 @@ enum class EMeteringJson {
class TMeteringSink {
public:
struct TParameters {
+ TDuration FlushLimit{TDuration::Hours(1)};
TDuration FlushInterval;
TString TabletId;
TString YcCloudId;