diff options
author | abcdef <akotov@ydb.tech> | 2023-06-07 13:36:38 +0300 |
---|---|---|
committer | abcdef <akotov@ydb.tech> | 2023-06-07 13:36:38 +0300 |
commit | cb91ffc1454a6cc69c96fbba4f532454dc4cf747 (patch) | |
tree | 2dd6942c6ab180b68a1fc1b47e0c32c110be4fb2 | |
parent | aa3ba3163b1c51da3a1c01a4205f3c1b1f387928 (diff) | |
download | ydb-cb91ffc1454a6cc69c96fbba4f532454dc4cf747.tar.gz |
fixed test DataStreams::TestReservedStorageMetering
Исправлены ошибки:
1) Для схемы `yds.storage.reserved.v1` в цикле не учитывалась граница часа.
2) Границы интервала времени могут быть в разных секундах, но разница между ними может быть меньше секунды. Это влияет на расчёт потребления ресурсов.
-rw-r--r-- | ydb/core/persqueue/metering_sink.cpp | 47 | ||||
-rw-r--r-- | ydb/services/datastreams/datastreams_ut.cpp | 47 |
2 files changed, 54 insertions, 40 deletions
diff --git a/ydb/core/persqueue/metering_sink.cpp b/ydb/core/persqueue/metering_sink.cpp index 8d982498ccf..0f1e6ef7287 100644 --- a/ydb/core/persqueue/metering_sink.cpp +++ b/ydb/core/persqueue/metering_sink.cpp @@ -69,9 +69,9 @@ bool TMeteringSink::IsCreated() const { } TString TMeteringSink::GetMeteringJson(const TString& metricBillingId, const TString& schemeName, - const THashMap<TString, ui64>& tags, - const TString& quantityUnit, ui64 quantity, - TInstant start, TInstant end, TInstant now, const TString& version) { + const THashMap<TString, ui64>& tags, + const TString& quantityUnit, ui64 quantity, + TInstant start, TInstant end, TInstant now, const TString& version) { MeteringCounter_.fetch_add(1); TStringStream output; NJson::TJsonWriter writer(&output, false); @@ -238,22 +238,22 @@ void TMeteringSink::Flush(TInstant now, bool force) { {"reserved_consumers_count", Parameters_.ConsumersCount} }; auto interval = TInstant::Hours(LastFlush_[whichOne].Hours()) + Parameters_.FlushLimit; - while (interval < now) { + + auto tryFlush = [&](TInstant start, TInstant finish) { const auto metricsJson = GetMeteringJson( name, schema, tags, "second", - Parameters_.PartitionsSize * (interval - LastFlush_[whichOne]).Seconds(), - LastFlush_[whichOne], interval, now); - LastFlush_[whichOne] = interval; + Parameters_.PartitionsSize * (finish.Seconds() - start.Seconds()), + start, finish, now); FlushFunction_(metricsJson); + LastFlush_[whichOne] = finish; + }; + + while (interval < now) { + tryFlush(LastFlush_[whichOne], interval); interval += Parameters_.FlushLimit; } if (LastFlush_[whichOne] < now) { - const auto metricsJson = GetMeteringJson( - name, schema, tags, "second", - Parameters_.PartitionsSize * (now - LastFlush_[whichOne]).Seconds(), - LastFlush_[whichOne], now, now); - LastFlush_[whichOne] = now; - FlushFunction_(metricsJson); + tryFlush(LastFlush_[whichOne], now); } } break; @@ -266,24 +266,23 @@ void TMeteringSink::Flush(TInstant now, bool force) { const TString name = "yds.reserved_resources"; const TString schema = "yds.storage.reserved.v1"; auto interval = TInstant::Hours(LastFlush_[whichOne].Hours()) + Parameters_.FlushLimit; - while (interval < now) { + + auto tryFlush = [&](TInstant start, TInstant finish) { const auto metricsJson = GetMeteringJson( name, schema, {}, "mbyte*second", Parameters_.PartitionsSize * (Parameters_.ReservedSpace / 1_MB) * - (now - LastFlush_[whichOne]).Seconds(), - LastFlush_[whichOne], interval, now); - LastFlush_[whichOne] = interval; + (finish.Seconds() - start.Seconds()), + start, finish, now); FlushFunction_(metricsJson); + LastFlush_[whichOne] = finish; + }; + + while (interval < now) { + tryFlush(LastFlush_[whichOne], interval); interval += Parameters_.FlushLimit; } if (LastFlush_[whichOne] < now) { - const auto metricsJson = GetMeteringJson( - name, schema, {}, "mbyte*second", - Parameters_.PartitionsSize * (Parameters_.ReservedSpace / 1_MB) * - (now - LastFlush_[whichOne]).Seconds(), - LastFlush_[whichOne], now, now); - LastFlush_[whichOne] = now; - FlushFunction_(metricsJson); + tryFlush(LastFlush_[whichOne], now); } } break; diff --git a/ydb/services/datastreams/datastreams_ut.cpp b/ydb/services/datastreams/datastreams_ut.cpp index 60a182fc706..eaa46ca2a1e 100644 --- a/ydb/services/datastreams/datastreams_ut.cpp +++ b/ydb/services/datastreams/datastreams_ut.cpp @@ -476,17 +476,22 @@ Y_UNIT_TEST_SUITE(DataStreams) { UNIT_ASSERT_VALUES_EQUAL( labels.find("ydb_database")->second.GetString(), "root"); }, - [storageMb](const NJson::TJsonValue::TMapType& map) { + [](const NJson::TJsonValue::TMapType& map) { UNIT_ASSERT(map.contains("usage")); auto& usage = map.find("usage")->second.GetMap(); - UNIT_ASSERT_GT(usage.find("start")->second.GetUInteger(), - TInstant::Now().Seconds() - 10); - UNIT_ASSERT_GT(usage.find("finish")->second.GetUInteger(), - TInstant::Now().Seconds() - 9); + + auto start = usage.find("start")->second.GetUInteger(); + auto finish = usage.find("finish")->second.GetUInteger(); + UNIT_ASSERT_LE(start, finish); + + auto now = TInstant::Now(); + UNIT_ASSERT_GT(start, now.Seconds() - 10); + UNIT_ASSERT_GT(finish, now.Seconds() - 9); UNIT_ASSERT_VALUES_EQUAL(usage.find("unit")->second.GetString(), "mbyte*second"); + UNIT_ASSERT_VALUES_EQUAL(usage.find("quantity")->second.GetUInteger(), - storageMb); + storageMb * (finish - start)); }); UNIT_ASSERT_VALUES_EQUAL(storageSchemaFound, 8); @@ -516,11 +521,16 @@ Y_UNIT_TEST_SUITE(DataStreams) { [](const NJson::TJsonValue::TMapType& map) { UNIT_ASSERT(map.contains("usage")); auto& usage = map.find("usage")->second.GetMap(); - UNIT_ASSERT_VALUES_EQUAL(usage.find("quantity")->second.GetInteger(), 1); - UNIT_ASSERT_GT(usage.find("start")->second.GetUInteger(), - TInstant::Now().Seconds() - 10); - UNIT_ASSERT_GT(usage.find("finish")->second.GetUInteger(), - TInstant::Now().Seconds() - 9); + + auto start = usage.find("start")->second.GetUInteger(); + auto finish = usage.find("finish")->second.GetUInteger(); + UNIT_ASSERT_LE(start, finish); + + UNIT_ASSERT_VALUES_EQUAL(usage.find("quantity")->second.GetInteger(), finish - start); + + auto now = TInstant::Now(); + UNIT_ASSERT_GT(start, now.Seconds() - 10); + UNIT_ASSERT_GT(finish, now.Seconds() - 9); UNIT_ASSERT_VALUES_EQUAL(usage.find("unit")->second.GetString(), "second"); }); @@ -689,11 +699,16 @@ Y_UNIT_TEST_SUITE(DataStreams) { [](const NJson::TJsonValue::TMapType& map) { UNIT_ASSERT(map.contains("usage")); auto& usage = map.find("usage")->second.GetMap(); - UNIT_ASSERT(usage.find("quantity")->second.GetInteger() <= 1); - UNIT_ASSERT_GT(usage.find("start")->second.GetUInteger(), - TInstant::Now().Seconds() - 10); - UNIT_ASSERT_GT(usage.find("finish")->second.GetUInteger(), - TInstant::Now().Seconds() - 9); + + auto start = usage.find("start")->second.GetUInteger(); + auto finish = usage.find("finish")->second.GetUInteger(); + UNIT_ASSERT_LE(start, finish); + + UNIT_ASSERT_VALUES_EQUAL(usage.find("quantity")->second.GetInteger(), finish - start); + + auto now = TInstant::Now(); + UNIT_ASSERT_GT(start, now.Seconds() - 10); + UNIT_ASSERT_GT(finish, now.Seconds() - 9); UNIT_ASSERT_VALUES_EQUAL(usage.find("unit")->second.GetString(), "second"); }); |