diff options
author | mokhotskii <mokhotskii@yandex-team.ru> | 2022-06-15 13:44:02 +0300 |
---|---|---|
committer | Daniil Cherednik <dan.cherednik@gmail.com> | 2022-06-15 13:44:02 +0300 |
commit | b03d6efa25ad31f6c8629c7a2f116f8c9ba8ef00 (patch) | |
tree | 095198bcc7bf4c43955787806f686cffca9dac9c | |
parent | 9d200e5bcf064005177db031e405e346a73d50c2 (diff) | |
download | ydb-b03d6efa25ad31f6c8629c7a2f116f8c9ba8ef00.tar.gz |
[merge to 22-2] Add FlushLimit to TMeteringSink
Add FlushLimit to TMeteringSink
It limits time period in one metering json. By default each json
contains at most 1hour time period.
REVIEW: 2580814
REVIEW: 2632151
x-ydb-stable-ref: f525de6bfdf3958a7bfd65061afdd7e29bdfd4d4
-rw-r--r-- | ydb/core/persqueue/metering_sink.cpp | 12 | ||||
-rw-r--r-- | ydb/core/persqueue/metering_sink.h | 1 |
2 files changed, 7 insertions, 6 deletions
diff --git a/ydb/core/persqueue/metering_sink.cpp b/ydb/core/persqueue/metering_sink.cpp index 942650760d..0726ff94c7 100644 --- a/ydb/core/persqueue/metering_sink.cpp +++ b/ydb/core/persqueue/metering_sink.cpp @@ -151,7 +151,7 @@ void TMeteringSink::Flush(TInstant now, bool force) { {"shard_enhanced_consumers_throughput", Parameters_.ConsumersThroughput}, {"reserved_storage_bytes", Parameters_.ReservedSpace} }; - auto interval = TInstant::Hours(LastFlush_[whichOne].Hours() + 1); + auto interval = TInstant::Hours(LastFlush_[whichOne].Hours()) + Parameters_.FlushLimit; while (interval < now) { const auto metricsJson = GetMeteringJson( name, schema, tags, "second", @@ -159,7 +159,7 @@ void TMeteringSink::Flush(TInstant now, bool force) { LastFlush_[whichOne], interval, now); LastFlush_[whichOne] = interval; FlushFunction_(metricsJson); - interval += TDuration::Hours(1); + interval += Parameters_.FlushLimit; } if (LastFlush_[whichOne] < now) { const auto metricsJson = GetMeteringJson( @@ -182,7 +182,7 @@ void TMeteringSink::Flush(TInstant now, bool force) { const THashMap<TString, ui64> tags = { {"reserved_throughput_bps", Parameters_.WriteQuota}, }; - auto interval = TInstant::Hours(LastFlush_[whichOne].Hours() + 1); + auto interval = TInstant::Hours(LastFlush_[whichOne].Hours()) + Parameters_.FlushLimit; while (interval < now) { const auto metricsJson = GetMeteringJson( name, schema, tags, "second", @@ -190,7 +190,7 @@ void TMeteringSink::Flush(TInstant now, bool force) { LastFlush_[whichOne], interval, now); LastFlush_[whichOne] = interval; FlushFunction_(metricsJson); - interval += TDuration::Hours(1); + interval += Parameters_.FlushLimit; } if (LastFlush_[whichOne] < now) { const auto metricsJson = GetMeteringJson( @@ -210,7 +210,7 @@ void TMeteringSink::Flush(TInstant now, bool force) { } const TString name = "yds.reserved_resources"; const TString schema = "yds.storage.reserved.v1"; - auto interval = TInstant::Hours(LastFlush_[whichOne].Hours() + 1); + auto interval = TInstant::Hours(LastFlush_[whichOne].Hours()) + Parameters_.FlushLimit; while (interval < now) { const auto metricsJson = GetMeteringJson( name, schema, {}, "mbyte*second", @@ -219,7 +219,7 @@ void TMeteringSink::Flush(TInstant now, bool force) { LastFlush_[whichOne], interval, now); LastFlush_[whichOne] = interval; FlushFunction_(metricsJson); - interval += TDuration::Hours(1); + interval += Parameters_.FlushLimit; } if (LastFlush_[whichOne] < now) { const auto metricsJson = GetMeteringJson( diff --git a/ydb/core/persqueue/metering_sink.h b/ydb/core/persqueue/metering_sink.h index 6fe0154811..d836a50e8e 100644 --- a/ydb/core/persqueue/metering_sink.h +++ b/ydb/core/persqueue/metering_sink.h @@ -15,6 +15,7 @@ enum class EMeteringJson { class TMeteringSink { public: struct TParameters { + TDuration FlushLimit{TDuration::Hours(1)}; TDuration FlushInterval; TString TabletId; TString YcCloudId; |