aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAlexey Efimov <xeno@ydb.tech>2024-04-08 16:24:03 +0200
committerGitHub <noreply@github.com>2024-04-08 16:24:03 +0200
commitba3d69df82d1e10f6bbac0f98da0217f2287692f (patch)
tree34e365f04edb661fcef5669e639d08f152f5cfed
parent284e4c328df8c7067f5c47cb6c271748e71d15e8 (diff)
downloadydb-ba3d69df82d1e10f6bbac0f98da0217f2287692f.tar.gz
fix local db downsampling (#3487)
-rw-r--r--ydb/core/graph/shard/backends.cpp104
-rw-r--r--ydb/core/graph/shard/backends.h4
-rw-r--r--ydb/core/graph/ut/graph_ut.cpp14
3 files changed, 54 insertions, 68 deletions
diff --git a/ydb/core/graph/shard/backends.cpp b/ydb/core/graph/shard/backends.cpp
index 2e8fdfe21af..0cd5af71342 100644
--- a/ydb/core/graph/shard/backends.cpp
+++ b/ydb/core/graph/shard/backends.cpp
@@ -392,89 +392,73 @@ bool TLocalBackend::ClearData(NTabletFlatExecutor::TTransactionContext& txc, TIn
}
bool TLocalBackend::DownsampleData(NTabletFlatExecutor::TTransactionContext& txc, TInstant now, const TAggregateSettings& settings) {
+ TInstant startTimestamp = TInstant::Seconds(settings.StartTimestamp.Seconds() / settings.SampleSize.Seconds() * settings.SampleSize.Seconds());
TInstant endTimestamp = now - settings.PeriodToStart;
- BLOG_D("Downsample data from " << settings.StartTimestamp.Seconds() << " to " << endTimestamp.Seconds());
+ BLOG_D("Downsample data from " << startTimestamp.Seconds() << " to " << endTimestamp.Seconds());
NIceDb::TNiceDb db(txc.DB);
ui64 rows = 0;
- auto rowset = db.Table<Schema::MetricsValues>().GreaterOrEqual(settings.StartTimestamp.Seconds()).LessOrEqual(endTimestamp.Seconds()).Select();
+ auto rowset = db.Table<Schema::MetricsValues>().GreaterOrEqual(startTimestamp.Seconds()).LessOrEqual(endTimestamp.Seconds()).Select();
if (!rowset.IsReady()) {
return false;
}
TMetricsValues values;
std::unordered_set<ui64> ids;
+ ui64 beginSampleTimestamp = 0;
+ ui64 endSampleTimestamp = 0;
ui64 prevTimestamp = 0;
- ui64 stopTimestamp = 0;
values.Values.resize(MetricsIndex.size());
while (!rowset.EndOfSet()) {
ui64 timestamp = rowset.GetValue<Schema::MetricsValues::Timestamp>();
- if (timestamp != prevTimestamp) {
- if (prevTimestamp == 0 || TDuration::Seconds(timestamp - prevTimestamp) < settings.SampleSize) {
- if (stopTimestamp == 0) {
- stopTimestamp = std::min(timestamp / settings.SampleSize.Seconds() * settings.SampleSize.Seconds() + settings.SampleSize.Seconds(), endTimestamp.Seconds());
+ ui64 metricId = rowset.GetValue<Schema::MetricsValues::Id>();
+ double value = rowset.GetValue<Schema::MetricsValues::Value>();
+ if (beginSampleTimestamp == 0 || endSampleTimestamp == 0) {
+ beginSampleTimestamp = timestamp / settings.SampleSize.Seconds() * settings.SampleSize.Seconds();
+ endSampleTimestamp = beginSampleTimestamp + settings.SampleSize.Seconds();
+ }
+ if (timestamp >= endSampleTimestamp) {
+ if (values.Timestamps.size() > 1) {
+ for (TInstant ts : values.Timestamps) {
+ for (ui64 id : ids) {
+ db.Table<Schema::MetricsValues>().Key(ts.Seconds(), id).Delete();
+ }
}
- if (timestamp >= stopTimestamp) {
- if (values.Timestamps.size() > 1) {
- for (TInstant ts : values.Timestamps) {
- for (ui64 id : ids) {
- db.Table<Schema::MetricsValues>().Key(ts.Seconds(), id).Delete();
- }
- }
- for (auto& val : values.Values) {
- if (val.size() < values.Timestamps.size()) {
- val.resize(values.Timestamps.size(), NAN);
- }
- }
- BLOG_TRACE("Normalizing " << values.Timestamps.size() << " values from " << values.Timestamps.front().Seconds()
- << " to " << values.Timestamps.back().Seconds());
- NormalizeAndDownsample(values, 1);
+ for (auto& val : values.Values) {
+ if (val.size() < values.Timestamps.size()) {
+ val.resize(values.Timestamps.size(), NAN);
}
- if (!values.Timestamps.empty()) {
- BLOG_TRACE("Result time is " << values.Timestamps.front().Seconds());
- for (ui64 id : ids) {
- if (!values.Values[id].empty()) {
- BLOG_TRACE("Updating values with id " << id);
- db.Table<Schema::MetricsValues>().Key(values.Timestamps.front().Seconds(), id).Update<Schema::MetricsValues::Value>(values.Values[id].front());
- }
- }
+ Y_VERIFY(val.size() <= values.Timestamps.size());
+ }
+ BLOG_TRACE("Normalizing " << values.Timestamps.size() << " values from " << values.Timestamps.front().Seconds()
+ << " to " << values.Timestamps.back().Seconds());
+ NormalizeAndDownsample(values, 1);
+ }
+ if (!values.Timestamps.empty()) {
+ BLOG_TRACE("Result time is " << values.Timestamps.front().Seconds());
+ for (ui64 id : ids) {
+ if (!values.Values[id].empty()) {
+ BLOG_TRACE("Updating values with id " << id);
+ db.Table<Schema::MetricsValues>().Key(values.Timestamps.front().Seconds(), id).Update<Schema::MetricsValues::Value>(values.Values[id].front());
}
- ids.clear();
- values.Clear();
- values.Values.resize(MetricsIndex.size());
- stopTimestamp = std::min(stopTimestamp + settings.SampleSize.Seconds(), endTimestamp.Seconds()) / settings.SampleSize.Seconds() * settings.SampleSize.Seconds();
}
- values.Timestamps.emplace_back(TInstant::Seconds(timestamp));
- ui64 id = rowset.GetValue<Schema::MetricsValues::Id>();
- double value = rowset.GetValue<Schema::MetricsValues::Value>();
- values.Values[id].push_back(value);
- ids.insert(id);
}
- } else {
- ui64 id = rowset.GetValue<Schema::MetricsValues::Id>();
- double value = rowset.GetValue<Schema::MetricsValues::Value>();
- values.Values[id].push_back(value);
+ settings.StartTimestamp = TInstant::Seconds(endSampleTimestamp);
+ ids.clear();
+ values.Clear();
+ beginSampleTimestamp = timestamp / settings.SampleSize.Seconds() * settings.SampleSize.Seconds();
+ endSampleTimestamp = beginSampleTimestamp + settings.SampleSize.Seconds();
+ prevTimestamp = 0;
}
+ if (timestamp != prevTimestamp) {
+ values.Timestamps.emplace_back(TInstant::Seconds(timestamp));
+ }
+ values.Values[metricId].push_back(value);
+ ids.insert(metricId);
+ Y_VERIFY(values.Values[metricId].size() <= values.Timestamps.size());
prevTimestamp = timestamp;
if (!rowset.Next()) {
return false;
}
}
-
- if (values.Timestamps.size() > 1) {
- BLOG_TRACE("Normalizing " << values.Timestamps.size() << " values from " << values.Timestamps.front().Seconds()
- << " to " << values.Timestamps.back().Seconds());
- NormalizeAndDownsample(values, 1);
- }
- if (!values.Timestamps.empty()) {
- BLOG_TRACE("Result time is " << values.Timestamps.front().Seconds());
- for (ui64 id : ids) {
- if (!values.Values[id].empty()) {
- BLOG_TRACE("Updating values with id " << id);
- db.Table<Schema::MetricsValues>().Key(values.Timestamps.front().Seconds(), id).Update<Schema::MetricsValues::Value>(values.Values[id].front());
- }
- }
- }
-
- settings.StartTimestamp = TInstant::Seconds(prevTimestamp);
BLOG_D("Downsampled " << rows << " logical rows");
return true;
}
diff --git a/ydb/core/graph/shard/backends.h b/ydb/core/graph/shard/backends.h
index a1221327d81..ef63740506f 100644
--- a/ydb/core/graph/shard/backends.h
+++ b/ydb/core/graph/shard/backends.h
@@ -49,7 +49,9 @@ public:
void Clear() {
Timestamps.clear();
- Values.clear();
+ for (auto& value : Values) {
+ value.clear();
+ }
}
};
diff --git a/ydb/core/graph/ut/graph_ut.cpp b/ydb/core/graph/ut/graph_ut.cpp
index da101106b4a..92794fcc6fa 100644
--- a/ydb/core/graph/ut/graph_ut.cpp
+++ b/ydb/core/graph/ut/graph_ut.cpp
@@ -410,15 +410,15 @@ Y_UNIT_TEST_SUITE(Graph) {
TAutoPtr<IEventHandle> handle;
NGraph::TEvGraph::TEvMetricsResult* response = runtime.GrabEdgeEventRethrow<NGraph::TEvGraph::TEvMetricsResult>(handle);
Ctest << response->Record.ShortDebugString() << Endl;
- UNIT_ASSERT_VALUES_EQUAL(response->Record.TimeSize(), 26);
+ UNIT_ASSERT_VALUES_EQUAL(response->Record.TimeSize(), 20);
UNIT_ASSERT_VALUES_EQUAL(response->Record.GetTime(0), 79);
- UNIT_ASSERT_VALUES_EQUAL(response->Record.GetTime(1), 80);
- UNIT_ASSERT_VALUES_EQUAL(response->Record.GetTime(2), 89);
- UNIT_ASSERT_VALUES_EQUAL(response->Record.GetTime(3), 94);
- UNIT_ASSERT_VALUES_EQUAL(response->Record.GetTime(4), 99);
+ UNIT_ASSERT_VALUES_EQUAL(response->Record.GetTime(1), 89);
+ UNIT_ASSERT_VALUES_EQUAL(response->Record.GetTime(2), 94);
+ UNIT_ASSERT_VALUES_EQUAL(response->Record.GetTime(3), 99);
+ UNIT_ASSERT_VALUES_EQUAL(response->Record.GetTime(4), 104);
UNIT_ASSERT(abs(response->Record.GetData(0).GetValues(0) - 14.5) < 0.1);
- UNIT_ASSERT(abs(response->Record.GetData(0).GetValues(1) - 20.0) < 0.1);
- UNIT_ASSERT(abs(response->Record.GetData(0).GetValues(2) - 23.6) < 0.1);
+ UNIT_ASSERT(abs(response->Record.GetData(0).GetValues(1) - 24.5) < 0.1);
+ UNIT_ASSERT(abs(response->Record.GetData(0).GetValues(2) - 32) < 0.1);
}
}