diff options
| author | kardymon-d <[email protected]> | 2023-11-01 22:15:31 +0300 |
|---|---|---|
| committer | kardymon-d <[email protected]> | 2023-11-01 22:33:52 +0300 |
| commit | c333d60e1d079deb6549e0517601c1c164eb34f0 (patch) | |
| tree | 25d22a7ba5e7c330f039f54d320df20282d1d665 | |
| parent | 273ae76ffb334e84101101e209963a00147daa04 (diff) | |
TimeOrderRecoverer: не перемешивать события с одинаковыми временами
Add counter to time_order_recoverer to prevent reordering
7 files changed, 79 insertions, 7 deletions
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_time_order_recover.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_time_order_recover.cpp index 3c104f1c2a4..ee8aebbfddd 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_time_order_recover.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_time_order_recover.cpp @@ -36,11 +36,13 @@ public: , RowLimit(rowLimit + 1) , Latest(0) , Terminating(false) + , MonotonicCounter(0) , Ctx(ctx) {} private: - using TEntry = std::pair<TTimestamp, NUdf::TUnboxedValue>; + using THeapKey = std::pair<TTimestamp, ui64>; + using TEntry = std::pair<THeapKey, NUdf::TUnboxedValue>; static constexpr auto Greater = [](const TEntry& lhs, const TEntry& rhs) { return lhs.first > rhs.first; }; @@ -68,7 +70,8 @@ public: if (Heap.empty()) { return NUdf::TUnboxedValue{}; } - TTimestamp oldest = Heap.top().first; + THeapKey oldestKey = Heap.top().first; + TTimestamp oldest = oldestKey.first; if (oldest < Latest + Delay || Heap.size() == RowLimit || Terminating) { auto result = std::move(Heap.top().second); Heap.pop(); @@ -84,7 +87,7 @@ public: Latest = t; } if (Latest + Delay < t && t < Latest + Ahead) { - Heap.emplace(t, std::move(row)); + Heap.emplace(THeapKey(t, ++MonotonicCounter), std::move(row)); } else { return row; } @@ -105,8 +108,9 @@ public: ClearState(); for (auto i = 0U; i < heapSize; ++i) { TTimestamp t = ReadUi64(in); + MonotonicCounter = ReadUi64(in); NUdf::TUnboxedValue row = ReadUnboxedValue(in, Self->Packer.RefMutableObject(Ctx, false, Self->StateType), Ctx); - Heap.emplace(t, std::move(row)); + Heap.emplace(THeapKey(t, MonotonicCounter), std::move(row)); } Latest = ReadUi64(in); Terminating = ReadBool(in); @@ -121,7 +125,9 @@ public: WriteUi32(out, Heap.size()); for (const TEntry& entry : Heap) { - WriteUi64(out, entry.first); + THeapKey key = entry.first; + WriteUi64(out, key.first); + WriteUi64(out, key.second); WriteUnboxedValue(out, Self->Packer.RefMutableObject(Ctx, false, Self->StateType), entry.second); } WriteUi64(out, Latest); @@ -144,6 +150,7 @@ public: const ui32 RowLimit; TTimestamp Latest; bool Terminating; //not applicable for streams, but useful for debug and testing + ui64 MonotonicCounter; TComputationContext& Ctx; }; diff --git a/ydb/library/yql/minikql/comp_nodes/ut/mkql_time_order_recover_saveload_ut.cpp b/ydb/library/yql/minikql/comp_nodes/ut/mkql_time_order_recover_saveload_ut.cpp index 5d278e8c4eb..2b9979c6982 100644 --- a/ydb/library/yql/minikql/comp_nodes/ut/mkql_time_order_recover_saveload_ut.cpp +++ b/ydb/library/yql/minikql/comp_nodes/ut/mkql_time_order_recover_saveload_ut.cpp @@ -143,13 +143,21 @@ namespace NKikimr { const std::vector<std::tuple<ui32, i64, ui32>> input = { // Group; Time; Value - {1000, 800, 100}, + {1000, 800, 101}, + {1000, 800, 102}, + {1000, 800, 103}, + {1000, 800, 104}, + {1000, 800, 105}, {2000, 802, 300}, {3000, 801, 200}}; const std::vector<std::tuple<ui32, i64, ui32>> expected = { // Group; Time; Value - {1000, 800, 100}, + {1000, 800, 101}, + {1000, 800, 102}, + {1000, 800, 103}, + {1000, 800, 104}, + {1000, 800, 105}, {3000, 801, 200}, {2000, 802, 300}}; diff --git a/ydb/library/yql/tests/s-expressions/suites/TimeOrderRecover/input_ordering.txt b/ydb/library/yql/tests/s-expressions/suites/TimeOrderRecover/input_ordering.txt new file mode 100644 index 00000000000..1363c953b67 --- /dev/null +++ b/ydb/library/yql/tests/s-expressions/suites/TimeOrderRecover/input_ordering.txt @@ -0,0 +1,15 @@ +{"ts"=1000u; "col1"="a1"}; +{"ts"=1000u; "col1"="a2"}; +{"ts"=1000u; "col1"="a3"}; +{"ts"=1000u; "col1"="a4"}; +{"ts"=1000u; "col1"="a5"}; +{"ts"=1001u; "col1"="b1"}; +{"ts"=1001u; "col1"="b2"}; +{"ts"=1001u; "col1"="b3"}; +{"ts"=1001u; "col1"="b4"}; +{"ts"=1001u; "col1"="b5"}; +{"ts"=1002u; "col1"="c1"}; +{"ts"=1002u; "col1"="c2"}; +{"ts"=1002u; "col1"="c3"}; +{"ts"=1002u; "col1"="c4"}; +{"ts"=1002u; "col1"="c5"};
\ No newline at end of file diff --git a/ydb/library/yql/tests/s-expressions/suites/TimeOrderRecover/input_ordering.txt.attr b/ydb/library/yql/tests/s-expressions/suites/TimeOrderRecover/input_ordering.txt.attr new file mode 100644 index 00000000000..63d77d68121 --- /dev/null +++ b/ydb/library/yql/tests/s-expressions/suites/TimeOrderRecover/input_ordering.txt.attr @@ -0,0 +1,6 @@ +{"_yql_row_spec"={ + "Type"=["StructType";[ + ["ts";["DataType";"Timestamp"]]; + ["col1";["DataType";"String"]]; + ]]; +}} diff --git a/ydb/library/yql/tests/s-expressions/suites/TimeOrderRecover/ordering.cfg b/ydb/library/yql/tests/s-expressions/suites/TimeOrderRecover/ordering.cfg new file mode 100644 index 00000000000..144fceddb1c --- /dev/null +++ b/ydb/library/yql/tests/s-expressions/suites/TimeOrderRecover/ordering.cfg @@ -0,0 +1 @@ +in Input input_ordering.txt diff --git a/ydb/library/yql/tests/s-expressions/suites/TimeOrderRecover/ordering.yql b/ydb/library/yql/tests/s-expressions/suites/TimeOrderRecover/ordering.yql new file mode 100644 index 00000000000..840af032400 --- /dev/null +++ b/ydb/library/yql/tests/s-expressions/suites/TimeOrderRecover/ordering.yql @@ -0,0 +1,14 @@ +( +(let $1 (DataSink 'result)) +(let $2 (Right! (Read! world (DataSource '"yt" '"plato") (MrTableConcat (Key '('table (String '"Input")))) (Void) '()))) +(let inputFlow (ToFlow (PersistableRepr (OrderedSqlProject $2 '((SqlProjectStarItem (TypeOf $2) '"" (lambda '($4) $4) '())))))) +(let timeOrderRecover (TimeOrderRecover + inputFlow + (lambda '($3) (Member $3 '"ts")) + (Interval '-10) + (Interval '10) + (Uint32 '100) +)) +(let $3 (Write! world $1 (Key)(ForwardList timeOrderRecover)'('('type) '('autoref)))) +(return (CommitAll! (Commit! $3 $1))) +) diff --git a/ydb/library/yql/tests/s-expressions/yt_native_file/part8/canondata/result.json b/ydb/library/yql/tests/s-expressions/yt_native_file/part8/canondata/result.json index 1d6e2f8ac6e..57f1086bba7 100644 --- a/ydb/library/yql/tests/s-expressions/yt_native_file/part8/canondata/result.json +++ b/ydb/library/yql/tests/s-expressions/yt_native_file/part8/canondata/result.json @@ -4662,6 +4662,27 @@ "uri": "https://storage.yandex-team.ru/get-devtools/995452/f98a01db24bf1dc14fb54bf2bf3c8e5fca8967c6/resource.tar.gz#test.test_TimeOrderRecover-happy_path-Results_/results.txt" } ], + "test.test[TimeOrderRecover-ordering-Debug]": [ + { + "checksum": "dd979fe0ec7bb6cdf1ce205cafdfd521", + "size": 1052, + "uri": "https://storage.yandex-team.ru/get-devtools/1942100/3416defdd21f52f2fd0e0428318d22400ab77887/resource.tar.gz#test.test_TimeOrderRecover-ordering-Debug_/opt.yql" + } + ], + "test.test[TimeOrderRecover-ordering-Plan]": [ + { + "checksum": "f67bc0645fae0844adcd0a4839183932", + "size": 2177, + "uri": "https://storage.yandex-team.ru/get-devtools/1900335/304a2080ef213199ea3cc747f04c6cdadad8c6ba/resource.tar.gz#test.test_TimeOrderRecover-ordering-Plan_/plan.txt" + } + ], + "test.test[TimeOrderRecover-ordering-Results]": [ + { + "checksum": "714788c5fd05a14f765c99a804f434dc", + "size": 3194, + "uri": "https://storage.yandex-team.ru/get-devtools/1942671/1db4d4843d1839221ea8ae53e65e00062ca57f97/resource.tar.gz#test.test_TimeOrderRecover-ordering-Results_/results.txt" + } + ], "test.test[Udf-AutoMapManyNamed-Debug]": [ { "checksum": "7ebb3a85fba24cde1eb2feee9c7196bd", |
