summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorkardymon-d <[email protected]>2023-11-01 22:15:31 +0300
committerkardymon-d <[email protected]>2023-11-01 22:33:52 +0300
commitc333d60e1d079deb6549e0517601c1c164eb34f0 (patch)
tree25d22a7ba5e7c330f039f54d320df20282d1d665
parent273ae76ffb334e84101101e209963a00147daa04 (diff)
TimeOrderRecoverer: не перемешивать события с одинаковыми временами
Add counter to time_order_recoverer to prevent reordering
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_time_order_recover.cpp17
-rw-r--r--ydb/library/yql/minikql/comp_nodes/ut/mkql_time_order_recover_saveload_ut.cpp12
-rw-r--r--ydb/library/yql/tests/s-expressions/suites/TimeOrderRecover/input_ordering.txt15
-rw-r--r--ydb/library/yql/tests/s-expressions/suites/TimeOrderRecover/input_ordering.txt.attr6
-rw-r--r--ydb/library/yql/tests/s-expressions/suites/TimeOrderRecover/ordering.cfg1
-rw-r--r--ydb/library/yql/tests/s-expressions/suites/TimeOrderRecover/ordering.yql14
-rw-r--r--ydb/library/yql/tests/s-expressions/yt_native_file/part8/canondata/result.json21
7 files changed, 79 insertions, 7 deletions
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_time_order_recover.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_time_order_recover.cpp
index 3c104f1c2a4..ee8aebbfddd 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_time_order_recover.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_time_order_recover.cpp
@@ -36,11 +36,13 @@ public:
, RowLimit(rowLimit + 1)
, Latest(0)
, Terminating(false)
+ , MonotonicCounter(0)
, Ctx(ctx)
{}
private:
- using TEntry = std::pair<TTimestamp, NUdf::TUnboxedValue>;
+ using THeapKey = std::pair<TTimestamp, ui64>;
+ using TEntry = std::pair<THeapKey, NUdf::TUnboxedValue>;
static constexpr auto Greater = [](const TEntry& lhs, const TEntry& rhs) {
return lhs.first > rhs.first;
};
@@ -68,7 +70,8 @@ public:
if (Heap.empty()) {
return NUdf::TUnboxedValue{};
}
- TTimestamp oldest = Heap.top().first;
+ THeapKey oldestKey = Heap.top().first;
+ TTimestamp oldest = oldestKey.first;
if (oldest < Latest + Delay || Heap.size() == RowLimit || Terminating) {
auto result = std::move(Heap.top().second);
Heap.pop();
@@ -84,7 +87,7 @@ public:
Latest = t;
}
if (Latest + Delay < t && t < Latest + Ahead) {
- Heap.emplace(t, std::move(row));
+ Heap.emplace(THeapKey(t, ++MonotonicCounter), std::move(row));
} else {
return row;
}
@@ -105,8 +108,9 @@ public:
ClearState();
for (auto i = 0U; i < heapSize; ++i) {
TTimestamp t = ReadUi64(in);
+ MonotonicCounter = ReadUi64(in);
NUdf::TUnboxedValue row = ReadUnboxedValue(in, Self->Packer.RefMutableObject(Ctx, false, Self->StateType), Ctx);
- Heap.emplace(t, std::move(row));
+ Heap.emplace(THeapKey(t, MonotonicCounter), std::move(row));
}
Latest = ReadUi64(in);
Terminating = ReadBool(in);
@@ -121,7 +125,9 @@ public:
WriteUi32(out, Heap.size());
for (const TEntry& entry : Heap) {
- WriteUi64(out, entry.first);
+ THeapKey key = entry.first;
+ WriteUi64(out, key.first);
+ WriteUi64(out, key.second);
WriteUnboxedValue(out, Self->Packer.RefMutableObject(Ctx, false, Self->StateType), entry.second);
}
WriteUi64(out, Latest);
@@ -144,6 +150,7 @@ public:
const ui32 RowLimit;
TTimestamp Latest;
bool Terminating; //not applicable for streams, but useful for debug and testing
+ ui64 MonotonicCounter;
TComputationContext& Ctx;
};
diff --git a/ydb/library/yql/minikql/comp_nodes/ut/mkql_time_order_recover_saveload_ut.cpp b/ydb/library/yql/minikql/comp_nodes/ut/mkql_time_order_recover_saveload_ut.cpp
index 5d278e8c4eb..2b9979c6982 100644
--- a/ydb/library/yql/minikql/comp_nodes/ut/mkql_time_order_recover_saveload_ut.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/ut/mkql_time_order_recover_saveload_ut.cpp
@@ -143,13 +143,21 @@ namespace NKikimr {
const std::vector<std::tuple<ui32, i64, ui32>> input = {
// Group; Time; Value
- {1000, 800, 100},
+ {1000, 800, 101},
+ {1000, 800, 102},
+ {1000, 800, 103},
+ {1000, 800, 104},
+ {1000, 800, 105},
{2000, 802, 300},
{3000, 801, 200}};
const std::vector<std::tuple<ui32, i64, ui32>> expected = {
// Group; Time; Value
- {1000, 800, 100},
+ {1000, 800, 101},
+ {1000, 800, 102},
+ {1000, 800, 103},
+ {1000, 800, 104},
+ {1000, 800, 105},
{3000, 801, 200},
{2000, 802, 300}};
diff --git a/ydb/library/yql/tests/s-expressions/suites/TimeOrderRecover/input_ordering.txt b/ydb/library/yql/tests/s-expressions/suites/TimeOrderRecover/input_ordering.txt
new file mode 100644
index 00000000000..1363c953b67
--- /dev/null
+++ b/ydb/library/yql/tests/s-expressions/suites/TimeOrderRecover/input_ordering.txt
@@ -0,0 +1,15 @@
+{"ts"=1000u; "col1"="a1"};
+{"ts"=1000u; "col1"="a2"};
+{"ts"=1000u; "col1"="a3"};
+{"ts"=1000u; "col1"="a4"};
+{"ts"=1000u; "col1"="a5"};
+{"ts"=1001u; "col1"="b1"};
+{"ts"=1001u; "col1"="b2"};
+{"ts"=1001u; "col1"="b3"};
+{"ts"=1001u; "col1"="b4"};
+{"ts"=1001u; "col1"="b5"};
+{"ts"=1002u; "col1"="c1"};
+{"ts"=1002u; "col1"="c2"};
+{"ts"=1002u; "col1"="c3"};
+{"ts"=1002u; "col1"="c4"};
+{"ts"=1002u; "col1"="c5"}; \ No newline at end of file
diff --git a/ydb/library/yql/tests/s-expressions/suites/TimeOrderRecover/input_ordering.txt.attr b/ydb/library/yql/tests/s-expressions/suites/TimeOrderRecover/input_ordering.txt.attr
new file mode 100644
index 00000000000..63d77d68121
--- /dev/null
+++ b/ydb/library/yql/tests/s-expressions/suites/TimeOrderRecover/input_ordering.txt.attr
@@ -0,0 +1,6 @@
+{"_yql_row_spec"={
+ "Type"=["StructType";[
+ ["ts";["DataType";"Timestamp"]];
+ ["col1";["DataType";"String"]];
+ ]];
+}}
diff --git a/ydb/library/yql/tests/s-expressions/suites/TimeOrderRecover/ordering.cfg b/ydb/library/yql/tests/s-expressions/suites/TimeOrderRecover/ordering.cfg
new file mode 100644
index 00000000000..144fceddb1c
--- /dev/null
+++ b/ydb/library/yql/tests/s-expressions/suites/TimeOrderRecover/ordering.cfg
@@ -0,0 +1 @@
+in Input input_ordering.txt
diff --git a/ydb/library/yql/tests/s-expressions/suites/TimeOrderRecover/ordering.yql b/ydb/library/yql/tests/s-expressions/suites/TimeOrderRecover/ordering.yql
new file mode 100644
index 00000000000..840af032400
--- /dev/null
+++ b/ydb/library/yql/tests/s-expressions/suites/TimeOrderRecover/ordering.yql
@@ -0,0 +1,14 @@
+(
+(let $1 (DataSink 'result))
+(let $2 (Right! (Read! world (DataSource '"yt" '"plato") (MrTableConcat (Key '('table (String '"Input")))) (Void) '())))
+(let inputFlow (ToFlow (PersistableRepr (OrderedSqlProject $2 '((SqlProjectStarItem (TypeOf $2) '"" (lambda '($4) $4) '()))))))
+(let timeOrderRecover (TimeOrderRecover
+ inputFlow
+ (lambda '($3) (Member $3 '"ts"))
+ (Interval '-10)
+ (Interval '10)
+ (Uint32 '100)
+))
+(let $3 (Write! world $1 (Key)(ForwardList timeOrderRecover)'('('type) '('autoref))))
+(return (CommitAll! (Commit! $3 $1)))
+)
diff --git a/ydb/library/yql/tests/s-expressions/yt_native_file/part8/canondata/result.json b/ydb/library/yql/tests/s-expressions/yt_native_file/part8/canondata/result.json
index 1d6e2f8ac6e..57f1086bba7 100644
--- a/ydb/library/yql/tests/s-expressions/yt_native_file/part8/canondata/result.json
+++ b/ydb/library/yql/tests/s-expressions/yt_native_file/part8/canondata/result.json
@@ -4662,6 +4662,27 @@
"uri": "https://storage.yandex-team.ru/get-devtools/995452/f98a01db24bf1dc14fb54bf2bf3c8e5fca8967c6/resource.tar.gz#test.test_TimeOrderRecover-happy_path-Results_/results.txt"
}
],
+ "test.test[TimeOrderRecover-ordering-Debug]": [
+ {
+ "checksum": "dd979fe0ec7bb6cdf1ce205cafdfd521",
+ "size": 1052,
+ "uri": "https://storage.yandex-team.ru/get-devtools/1942100/3416defdd21f52f2fd0e0428318d22400ab77887/resource.tar.gz#test.test_TimeOrderRecover-ordering-Debug_/opt.yql"
+ }
+ ],
+ "test.test[TimeOrderRecover-ordering-Plan]": [
+ {
+ "checksum": "f67bc0645fae0844adcd0a4839183932",
+ "size": 2177,
+ "uri": "https://storage.yandex-team.ru/get-devtools/1900335/304a2080ef213199ea3cc747f04c6cdadad8c6ba/resource.tar.gz#test.test_TimeOrderRecover-ordering-Plan_/plan.txt"
+ }
+ ],
+ "test.test[TimeOrderRecover-ordering-Results]": [
+ {
+ "checksum": "714788c5fd05a14f765c99a804f434dc",
+ "size": 3194,
+ "uri": "https://storage.yandex-team.ru/get-devtools/1942671/1db4d4843d1839221ea8ae53e65e00062ca57f97/resource.tar.gz#test.test_TimeOrderRecover-ordering-Results_/results.txt"
+ }
+ ],
"test.test[Udf-AutoMapManyNamed-Debug]": [
{
"checksum": "7ebb3a85fba24cde1eb2feee9c7196bd",