diff options
author | Alexey Efimov <xeno@ydb.tech> | 2024-04-08 16:24:03 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-04-08 16:24:03 +0200 |
commit | ba3d69df82d1e10f6bbac0f98da0217f2287692f (patch) | |
tree | 34e365f04edb661fcef5669e639d08f152f5cfed | |
parent | 284e4c328df8c7067f5c47cb6c271748e71d15e8 (diff) | |
download | ydb-ba3d69df82d1e10f6bbac0f98da0217f2287692f.tar.gz |
fix local db downsampling (#3487)
-rw-r--r-- | ydb/core/graph/shard/backends.cpp | 104 | ||||
-rw-r--r-- | ydb/core/graph/shard/backends.h | 4 | ||||
-rw-r--r-- | ydb/core/graph/ut/graph_ut.cpp | 14 |
3 files changed, 54 insertions, 68 deletions
diff --git a/ydb/core/graph/shard/backends.cpp b/ydb/core/graph/shard/backends.cpp index 2e8fdfe21af..0cd5af71342 100644 --- a/ydb/core/graph/shard/backends.cpp +++ b/ydb/core/graph/shard/backends.cpp @@ -392,89 +392,73 @@ bool TLocalBackend::ClearData(NTabletFlatExecutor::TTransactionContext& txc, TIn } bool TLocalBackend::DownsampleData(NTabletFlatExecutor::TTransactionContext& txc, TInstant now, const TAggregateSettings& settings) { + TInstant startTimestamp = TInstant::Seconds(settings.StartTimestamp.Seconds() / settings.SampleSize.Seconds() * settings.SampleSize.Seconds()); TInstant endTimestamp = now - settings.PeriodToStart; - BLOG_D("Downsample data from " << settings.StartTimestamp.Seconds() << " to " << endTimestamp.Seconds()); + BLOG_D("Downsample data from " << startTimestamp.Seconds() << " to " << endTimestamp.Seconds()); NIceDb::TNiceDb db(txc.DB); ui64 rows = 0; - auto rowset = db.Table<Schema::MetricsValues>().GreaterOrEqual(settings.StartTimestamp.Seconds()).LessOrEqual(endTimestamp.Seconds()).Select(); + auto rowset = db.Table<Schema::MetricsValues>().GreaterOrEqual(startTimestamp.Seconds()).LessOrEqual(endTimestamp.Seconds()).Select(); if (!rowset.IsReady()) { return false; } TMetricsValues values; std::unordered_set<ui64> ids; + ui64 beginSampleTimestamp = 0; + ui64 endSampleTimestamp = 0; ui64 prevTimestamp = 0; - ui64 stopTimestamp = 0; values.Values.resize(MetricsIndex.size()); while (!rowset.EndOfSet()) { ui64 timestamp = rowset.GetValue<Schema::MetricsValues::Timestamp>(); - if (timestamp != prevTimestamp) { - if (prevTimestamp == 0 || TDuration::Seconds(timestamp - prevTimestamp) < settings.SampleSize) { - if (stopTimestamp == 0) { - stopTimestamp = std::min(timestamp / settings.SampleSize.Seconds() * settings.SampleSize.Seconds() + settings.SampleSize.Seconds(), endTimestamp.Seconds()); + ui64 metricId = rowset.GetValue<Schema::MetricsValues::Id>(); + double value = rowset.GetValue<Schema::MetricsValues::Value>(); + if (beginSampleTimestamp == 0 || endSampleTimestamp == 0) { + beginSampleTimestamp = timestamp / settings.SampleSize.Seconds() * settings.SampleSize.Seconds(); + endSampleTimestamp = beginSampleTimestamp + settings.SampleSize.Seconds(); + } + if (timestamp >= endSampleTimestamp) { + if (values.Timestamps.size() > 1) { + for (TInstant ts : values.Timestamps) { + for (ui64 id : ids) { + db.Table<Schema::MetricsValues>().Key(ts.Seconds(), id).Delete(); + } } - if (timestamp >= stopTimestamp) { - if (values.Timestamps.size() > 1) { - for (TInstant ts : values.Timestamps) { - for (ui64 id : ids) { - db.Table<Schema::MetricsValues>().Key(ts.Seconds(), id).Delete(); - } - } - for (auto& val : values.Values) { - if (val.size() < values.Timestamps.size()) { - val.resize(values.Timestamps.size(), NAN); - } - } - BLOG_TRACE("Normalizing " << values.Timestamps.size() << " values from " << values.Timestamps.front().Seconds() - << " to " << values.Timestamps.back().Seconds()); - NormalizeAndDownsample(values, 1); + for (auto& val : values.Values) { + if (val.size() < values.Timestamps.size()) { + val.resize(values.Timestamps.size(), NAN); } - if (!values.Timestamps.empty()) { - BLOG_TRACE("Result time is " << values.Timestamps.front().Seconds()); - for (ui64 id : ids) { - if (!values.Values[id].empty()) { - BLOG_TRACE("Updating values with id " << id); - db.Table<Schema::MetricsValues>().Key(values.Timestamps.front().Seconds(), id).Update<Schema::MetricsValues::Value>(values.Values[id].front()); - } - } + Y_VERIFY(val.size() <= values.Timestamps.size()); + } + BLOG_TRACE("Normalizing " << values.Timestamps.size() << " values from " << values.Timestamps.front().Seconds() + << " to " << values.Timestamps.back().Seconds()); + NormalizeAndDownsample(values, 1); + } + if (!values.Timestamps.empty()) { + BLOG_TRACE("Result time is " << values.Timestamps.front().Seconds()); + for (ui64 id : ids) { + if (!values.Values[id].empty()) { + BLOG_TRACE("Updating values with id " << id); + db.Table<Schema::MetricsValues>().Key(values.Timestamps.front().Seconds(), id).Update<Schema::MetricsValues::Value>(values.Values[id].front()); } - ids.clear(); - values.Clear(); - values.Values.resize(MetricsIndex.size()); - stopTimestamp = std::min(stopTimestamp + settings.SampleSize.Seconds(), endTimestamp.Seconds()) / settings.SampleSize.Seconds() * settings.SampleSize.Seconds(); } - values.Timestamps.emplace_back(TInstant::Seconds(timestamp)); - ui64 id = rowset.GetValue<Schema::MetricsValues::Id>(); - double value = rowset.GetValue<Schema::MetricsValues::Value>(); - values.Values[id].push_back(value); - ids.insert(id); } - } else { - ui64 id = rowset.GetValue<Schema::MetricsValues::Id>(); - double value = rowset.GetValue<Schema::MetricsValues::Value>(); - values.Values[id].push_back(value); + settings.StartTimestamp = TInstant::Seconds(endSampleTimestamp); + ids.clear(); + values.Clear(); + beginSampleTimestamp = timestamp / settings.SampleSize.Seconds() * settings.SampleSize.Seconds(); + endSampleTimestamp = beginSampleTimestamp + settings.SampleSize.Seconds(); + prevTimestamp = 0; } + if (timestamp != prevTimestamp) { + values.Timestamps.emplace_back(TInstant::Seconds(timestamp)); + } + values.Values[metricId].push_back(value); + ids.insert(metricId); + Y_VERIFY(values.Values[metricId].size() <= values.Timestamps.size()); prevTimestamp = timestamp; if (!rowset.Next()) { return false; } } - - if (values.Timestamps.size() > 1) { - BLOG_TRACE("Normalizing " << values.Timestamps.size() << " values from " << values.Timestamps.front().Seconds() - << " to " << values.Timestamps.back().Seconds()); - NormalizeAndDownsample(values, 1); - } - if (!values.Timestamps.empty()) { - BLOG_TRACE("Result time is " << values.Timestamps.front().Seconds()); - for (ui64 id : ids) { - if (!values.Values[id].empty()) { - BLOG_TRACE("Updating values with id " << id); - db.Table<Schema::MetricsValues>().Key(values.Timestamps.front().Seconds(), id).Update<Schema::MetricsValues::Value>(values.Values[id].front()); - } - } - } - - settings.StartTimestamp = TInstant::Seconds(prevTimestamp); BLOG_D("Downsampled " << rows << " logical rows"); return true; } diff --git a/ydb/core/graph/shard/backends.h b/ydb/core/graph/shard/backends.h index a1221327d81..ef63740506f 100644 --- a/ydb/core/graph/shard/backends.h +++ b/ydb/core/graph/shard/backends.h @@ -49,7 +49,9 @@ public: void Clear() { Timestamps.clear(); - Values.clear(); + for (auto& value : Values) { + value.clear(); + } } }; diff --git a/ydb/core/graph/ut/graph_ut.cpp b/ydb/core/graph/ut/graph_ut.cpp index da101106b4a..92794fcc6fa 100644 --- a/ydb/core/graph/ut/graph_ut.cpp +++ b/ydb/core/graph/ut/graph_ut.cpp @@ -410,15 +410,15 @@ Y_UNIT_TEST_SUITE(Graph) { TAutoPtr<IEventHandle> handle; NGraph::TEvGraph::TEvMetricsResult* response = runtime.GrabEdgeEventRethrow<NGraph::TEvGraph::TEvMetricsResult>(handle); Ctest << response->Record.ShortDebugString() << Endl; - UNIT_ASSERT_VALUES_EQUAL(response->Record.TimeSize(), 26); + UNIT_ASSERT_VALUES_EQUAL(response->Record.TimeSize(), 20); UNIT_ASSERT_VALUES_EQUAL(response->Record.GetTime(0), 79); - UNIT_ASSERT_VALUES_EQUAL(response->Record.GetTime(1), 80); - UNIT_ASSERT_VALUES_EQUAL(response->Record.GetTime(2), 89); - UNIT_ASSERT_VALUES_EQUAL(response->Record.GetTime(3), 94); - UNIT_ASSERT_VALUES_EQUAL(response->Record.GetTime(4), 99); + UNIT_ASSERT_VALUES_EQUAL(response->Record.GetTime(1), 89); + UNIT_ASSERT_VALUES_EQUAL(response->Record.GetTime(2), 94); + UNIT_ASSERT_VALUES_EQUAL(response->Record.GetTime(3), 99); + UNIT_ASSERT_VALUES_EQUAL(response->Record.GetTime(4), 104); UNIT_ASSERT(abs(response->Record.GetData(0).GetValues(0) - 14.5) < 0.1); - UNIT_ASSERT(abs(response->Record.GetData(0).GetValues(1) - 20.0) < 0.1); - UNIT_ASSERT(abs(response->Record.GetData(0).GetValues(2) - 23.6) < 0.1); + UNIT_ASSERT(abs(response->Record.GetData(0).GetValues(1) - 24.5) < 0.1); + UNIT_ASSERT(abs(response->Record.GetData(0).GetValues(2) - 32) < 0.1); } } |