aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorabcdef <akotov@ydb.tech>2023-06-07 13:36:38 +0300
committerabcdef <akotov@ydb.tech>2023-06-07 13:36:38 +0300
commitcb91ffc1454a6cc69c96fbba4f532454dc4cf747 (patch)
tree2dd6942c6ab180b68a1fc1b47e0c32c110be4fb2
parentaa3ba3163b1c51da3a1c01a4205f3c1b1f387928 (diff)
downloadydb-cb91ffc1454a6cc69c96fbba4f532454dc4cf747.tar.gz
fixed test DataStreams::TestReservedStorageMetering
Исправлены ошибки: 1) Для схемы `yds.storage.reserved.v1` в цикле не учитывалась граница часа. 2) Границы интервала времени могут быть в разных секундах, но разница между ними может быть меньше секунды. Это влияет на расчёт потребления ресурсов.
-rw-r--r--ydb/core/persqueue/metering_sink.cpp47
-rw-r--r--ydb/services/datastreams/datastreams_ut.cpp47
2 files changed, 54 insertions, 40 deletions
diff --git a/ydb/core/persqueue/metering_sink.cpp b/ydb/core/persqueue/metering_sink.cpp
index 8d982498ccf..0f1e6ef7287 100644
--- a/ydb/core/persqueue/metering_sink.cpp
+++ b/ydb/core/persqueue/metering_sink.cpp
@@ -69,9 +69,9 @@ bool TMeteringSink::IsCreated() const {
}
TString TMeteringSink::GetMeteringJson(const TString& metricBillingId, const TString& schemeName,
- const THashMap<TString, ui64>& tags,
- const TString& quantityUnit, ui64 quantity,
- TInstant start, TInstant end, TInstant now, const TString& version) {
+ const THashMap<TString, ui64>& tags,
+ const TString& quantityUnit, ui64 quantity,
+ TInstant start, TInstant end, TInstant now, const TString& version) {
MeteringCounter_.fetch_add(1);
TStringStream output;
NJson::TJsonWriter writer(&output, false);
@@ -238,22 +238,22 @@ void TMeteringSink::Flush(TInstant now, bool force) {
{"reserved_consumers_count", Parameters_.ConsumersCount}
};
auto interval = TInstant::Hours(LastFlush_[whichOne].Hours()) + Parameters_.FlushLimit;
- while (interval < now) {
+
+ auto tryFlush = [&](TInstant start, TInstant finish) {
const auto metricsJson = GetMeteringJson(
name, schema, tags, "second",
- Parameters_.PartitionsSize * (interval - LastFlush_[whichOne]).Seconds(),
- LastFlush_[whichOne], interval, now);
- LastFlush_[whichOne] = interval;
+ Parameters_.PartitionsSize * (finish.Seconds() - start.Seconds()),
+ start, finish, now);
FlushFunction_(metricsJson);
+ LastFlush_[whichOne] = finish;
+ };
+
+ while (interval < now) {
+ tryFlush(LastFlush_[whichOne], interval);
interval += Parameters_.FlushLimit;
}
if (LastFlush_[whichOne] < now) {
- const auto metricsJson = GetMeteringJson(
- name, schema, tags, "second",
- Parameters_.PartitionsSize * (now - LastFlush_[whichOne]).Seconds(),
- LastFlush_[whichOne], now, now);
- LastFlush_[whichOne] = now;
- FlushFunction_(metricsJson);
+ tryFlush(LastFlush_[whichOne], now);
}
}
break;
@@ -266,24 +266,23 @@ void TMeteringSink::Flush(TInstant now, bool force) {
const TString name = "yds.reserved_resources";
const TString schema = "yds.storage.reserved.v1";
auto interval = TInstant::Hours(LastFlush_[whichOne].Hours()) + Parameters_.FlushLimit;
- while (interval < now) {
+
+ auto tryFlush = [&](TInstant start, TInstant finish) {
const auto metricsJson = GetMeteringJson(
name, schema, {}, "mbyte*second",
Parameters_.PartitionsSize * (Parameters_.ReservedSpace / 1_MB) *
- (now - LastFlush_[whichOne]).Seconds(),
- LastFlush_[whichOne], interval, now);
- LastFlush_[whichOne] = interval;
+ (finish.Seconds() - start.Seconds()),
+ start, finish, now);
FlushFunction_(metricsJson);
+ LastFlush_[whichOne] = finish;
+ };
+
+ while (interval < now) {
+ tryFlush(LastFlush_[whichOne], interval);
interval += Parameters_.FlushLimit;
}
if (LastFlush_[whichOne] < now) {
- const auto metricsJson = GetMeteringJson(
- name, schema, {}, "mbyte*second",
- Parameters_.PartitionsSize * (Parameters_.ReservedSpace / 1_MB) *
- (now - LastFlush_[whichOne]).Seconds(),
- LastFlush_[whichOne], now, now);
- LastFlush_[whichOne] = now;
- FlushFunction_(metricsJson);
+ tryFlush(LastFlush_[whichOne], now);
}
}
break;
diff --git a/ydb/services/datastreams/datastreams_ut.cpp b/ydb/services/datastreams/datastreams_ut.cpp
index 60a182fc706..eaa46ca2a1e 100644
--- a/ydb/services/datastreams/datastreams_ut.cpp
+++ b/ydb/services/datastreams/datastreams_ut.cpp
@@ -476,17 +476,22 @@ Y_UNIT_TEST_SUITE(DataStreams) {
UNIT_ASSERT_VALUES_EQUAL(
labels.find("ydb_database")->second.GetString(), "root");
},
- [storageMb](const NJson::TJsonValue::TMapType& map) {
+ [](const NJson::TJsonValue::TMapType& map) {
UNIT_ASSERT(map.contains("usage"));
auto& usage = map.find("usage")->second.GetMap();
- UNIT_ASSERT_GT(usage.find("start")->second.GetUInteger(),
- TInstant::Now().Seconds() - 10);
- UNIT_ASSERT_GT(usage.find("finish")->second.GetUInteger(),
- TInstant::Now().Seconds() - 9);
+
+ auto start = usage.find("start")->second.GetUInteger();
+ auto finish = usage.find("finish")->second.GetUInteger();
+ UNIT_ASSERT_LE(start, finish);
+
+ auto now = TInstant::Now();
+ UNIT_ASSERT_GT(start, now.Seconds() - 10);
+ UNIT_ASSERT_GT(finish, now.Seconds() - 9);
UNIT_ASSERT_VALUES_EQUAL(usage.find("unit")->second.GetString(),
"mbyte*second");
+
UNIT_ASSERT_VALUES_EQUAL(usage.find("quantity")->second.GetUInteger(),
- storageMb);
+ storageMb * (finish - start));
});
UNIT_ASSERT_VALUES_EQUAL(storageSchemaFound, 8);
@@ -516,11 +521,16 @@ Y_UNIT_TEST_SUITE(DataStreams) {
[](const NJson::TJsonValue::TMapType& map) {
UNIT_ASSERT(map.contains("usage"));
auto& usage = map.find("usage")->second.GetMap();
- UNIT_ASSERT_VALUES_EQUAL(usage.find("quantity")->second.GetInteger(), 1);
- UNIT_ASSERT_GT(usage.find("start")->second.GetUInteger(),
- TInstant::Now().Seconds() - 10);
- UNIT_ASSERT_GT(usage.find("finish")->second.GetUInteger(),
- TInstant::Now().Seconds() - 9);
+
+ auto start = usage.find("start")->second.GetUInteger();
+ auto finish = usage.find("finish")->second.GetUInteger();
+ UNIT_ASSERT_LE(start, finish);
+
+ UNIT_ASSERT_VALUES_EQUAL(usage.find("quantity")->second.GetInteger(), finish - start);
+
+ auto now = TInstant::Now();
+ UNIT_ASSERT_GT(start, now.Seconds() - 10);
+ UNIT_ASSERT_GT(finish, now.Seconds() - 9);
UNIT_ASSERT_VALUES_EQUAL(usage.find("unit")->second.GetString(),
"second");
});
@@ -689,11 +699,16 @@ Y_UNIT_TEST_SUITE(DataStreams) {
[](const NJson::TJsonValue::TMapType& map) {
UNIT_ASSERT(map.contains("usage"));
auto& usage = map.find("usage")->second.GetMap();
- UNIT_ASSERT(usage.find("quantity")->second.GetInteger() <= 1);
- UNIT_ASSERT_GT(usage.find("start")->second.GetUInteger(),
- TInstant::Now().Seconds() - 10);
- UNIT_ASSERT_GT(usage.find("finish")->second.GetUInteger(),
- TInstant::Now().Seconds() - 9);
+
+ auto start = usage.find("start")->second.GetUInteger();
+ auto finish = usage.find("finish")->second.GetUInteger();
+ UNIT_ASSERT_LE(start, finish);
+
+ UNIT_ASSERT_VALUES_EQUAL(usage.find("quantity")->second.GetInteger(), finish - start);
+
+ auto now = TInstant::Now();
+ UNIT_ASSERT_GT(start, now.Seconds() - 10);
+ UNIT_ASSERT_GT(finish, now.Seconds() - 9);
UNIT_ASSERT_VALUES_EQUAL(usage.find("unit")->second.GetString(),
"second");
});