diff options
author | d-mokhnatkin <d-mokhnatkin@yandex-team.ru> | 2022-02-10 11:41:43 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 15:58:17 +0300 |
commit | 8850c0828a3703fdb4dd75090c8edf8de14b5671 (patch) | |
tree | cdb759e5bf4a20ff39e503834978c7e06de17608 | |
parent | aae4145d949ef4dddd30387a038da20a57b84c20 (diff) | |
download | ydb-8850c0828a3703fdb4dd75090c8edf8de14b5671.tar.gz |
YQ-675: fix hopping
ref:6a78c858cc2d2b70cc9e47ceee374941b1a44bee
5 files changed, 570 insertions, 197 deletions
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_multihopping.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_multihopping.cpp index 4c992dc2cbb..cb2b13f86d0 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_multihopping.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_multihopping.cpp @@ -70,7 +70,7 @@ public: struct TKeyState { std::vector<TBucket, TMKQLAllocator<TBucket>> Buckets; // circular buffer - ui64 HopIndex; + ui64 HopIndex; // Start index of current window TKeyState(ui64 bucketsCount, ui64 hopIndex) : Buckets(bucketsCount) @@ -163,19 +163,13 @@ public: return NUdf::EFetchStatus::Finish; } - i64 thrownEvents = 0; - i64 newHops = 0; - i64 emptyTimeCt = 0; + i64 thrownEventsStat = 0; + i64 newHopsStat = 0; + i64 emptyTimeCtStat = 0; Y_DEFER { - if (thrownEvents) { - MKQL_ADD_STAT(Ctx.Stats, Hop_ThrownEventsCount, thrownEvents); - } - if (newHops) { - MKQL_ADD_STAT(Ctx.Stats, Hop_NewHopsCount, newHops); - } - if (emptyTimeCt) { - MKQL_ADD_STAT(Ctx.Stats, Hop_EmptyTimeCount, emptyTimeCt); - } + MKQL_ADD_STAT(Ctx.Stats, Hop_ThrownEventsCount, thrownEventsStat); + MKQL_ADD_STAT(Ctx.Stats, Hop_NewHopsCount, newHopsStat); + MKQL_ADD_STAT(Ctx.Stats, Hop_EmptyTimeCount, emptyTimeCtStat); }; for (NUdf::TUnboxedValue item;;) { @@ -188,7 +182,7 @@ public: const auto status = Stream.Fetch(item); if (status != NUdf::EFetchStatus::Ok) { if (status == NUdf::EFetchStatus::Finish) { - CloseOldBuckets(Max<ui64>(), newHops); + CloseOldBuckets(Max<ui64>(), newHopsStat); Finished = true; if (!Ready.empty()) { result = std::move(Ready.front()); @@ -203,34 +197,38 @@ public: auto key = Self->KeyExtract->GetValue(Ctx); const auto& time = Self->OutTime->GetValue(Ctx); if (!time) { - ++emptyTimeCt; + ++emptyTimeCtStat; continue; } const auto ts = time.Get<ui64>(); const auto hopIndex = ts / HopTime; - auto& keyState = GetOrCreateKeyState(key, hopIndex + 1); - - CloseOldBucketsForKey(key, keyState, hopIndex, newHops); - - if (hopIndex + DelayHopCount + 1 >= keyState.HopIndex) { - auto& bucket = keyState.Buckets[hopIndex % keyState.Buckets.size()]; - if (!bucket.HasValue) { - bucket.Value = Self->OutInit->GetValue(Ctx); - bucket.HasValue = true; - } else { - Self->Key->SetValue(Ctx, NUdf::TUnboxedValue(key)); - Self->State->SetValue(Ctx, NUdf::TUnboxedValue(bucket.Value)); - bucket.Value = Self->OutUpdate->GetValue(Ctx); - } + auto& keyState = GetOrCreateKeyState(key, hopIndex); + + if (hopIndex < keyState.HopIndex) { + ++thrownEventsStat; + continue; + } + + // Overflow is not possible, because of hopIndex is a product of a division + auto closeBeforeIndex = Max<i64>(hopIndex + 1 - DelayHopCount - IntervalHopCount, 0); + + CloseOldBucketsForKey(key, keyState, closeBeforeIndex, newHopsStat); + + auto& bucket = keyState.Buckets[hopIndex % keyState.Buckets.size()]; + if (!bucket.HasValue) { + bucket.Value = Self->OutInit->GetValue(Ctx); + bucket.HasValue = true; } else { - ++thrownEvents; + Self->Key->SetValue(Ctx, NUdf::TUnboxedValue(key)); + Self->State->SetValue(Ctx, NUdf::TUnboxedValue(bucket.Value)); + bucket.Value = Self->OutUpdate->GetValue(Ctx); } if (WatermarkTracker) { const auto newWatermark = WatermarkTracker->HandleNextEventTime(ts); if (newWatermark) { - CloseOldBuckets(*newWatermark, newHops); + CloseOldBuckets(*newWatermark, newHopsStat); } } MKQL_SET_STAT(Ctx.Stats, Hop_KeysCount, StatesMap.size()); @@ -241,7 +239,9 @@ public: const auto iter = StatesMap.try_emplace( key, IntervalHopCount + DelayHopCount, - hopIndex + Max<i64>(hopIndex + 1 - IntervalHopCount, 0) + // For first element we shouldn't forget windows in the past + // Overflow is not possible, because of hopIndex is a product of a division ); if (iter.second) { key.Ref(); @@ -253,20 +253,23 @@ public: bool CloseOldBucketsForKey( const NUdf::TUnboxedValue& key, TKeyState& keyState, - const ui64 hopIndex, - i64& newHops) + const ui64 closeBeforeIndex, // Excluded bound + i64& newHopsStat) { auto& bucketsForKey = keyState.Buckets; - const auto endIndex = Min(hopIndex, keyState.HopIndex + bucketsForKey.size()); // TODO: fix possible overflow - for (auto& hopIndexForKey = keyState.HopIndex; hopIndexForKey <= endIndex; hopIndexForKey++) { - auto firstBucketIndex = hopIndexForKey % bucketsForKey.size(); + auto stateEmpty = true; + for (auto i = 0; i < bucketsForKey.size(); i++) { + const auto curHopIndex = keyState.HopIndex; + if (curHopIndex >= closeBeforeIndex) { + stateEmpty = false; + break; + } - auto bucketIndex = firstBucketIndex; TMaybe<NUdf::TUnboxedValue> aggregated; - - for (ui64 i = 0; i < IntervalHopCount; ++i) { - const auto& bucket = bucketsForKey[bucketIndex]; + for (ui64 j = 0; j < IntervalHopCount; j++) { + const auto curBucketIndex = (curHopIndex + j) % bucketsForKey.size(); + const auto& bucket = bucketsForKey[curBucketIndex]; if (bucket.HasValue) { if (!aggregated) { // todo: clone Self->InSave->SetValue(Ctx, NUdf::TUnboxedValue(bucket.Value)); @@ -278,33 +281,32 @@ public: aggregated = Self->OutMerge->GetValue(Ctx); } } - if (++bucketIndex == bucketsForKey.size()) { - bucketIndex = 0; - } } - auto& clearBucket = bucketsForKey[firstBucketIndex]; + auto& clearBucket = bucketsForKey[curHopIndex % bucketsForKey.size()]; clearBucket.Value = NUdf::TUnboxedValue(); clearBucket.HasValue = false; if (aggregated) { Self->Key->SetValue(Ctx, NUdf::TUnboxedValue(key)); Self->State->SetValue(Ctx, NUdf::TUnboxedValue(*aggregated)); - Self->Time->SetValue(Ctx, NUdf::TUnboxedValuePod((hopIndexForKey - DelayHopCount) * HopTime)); + // Outer code requires window end time (not start as could be expected) + Self->Time->SetValue(Ctx, NUdf::TUnboxedValuePod((curHopIndex + IntervalHopCount) * HopTime)); Ready.emplace_back(Self->OutFinish->GetValue(Ctx)); } - ++newHops; + keyState.HopIndex++; + newHopsStat++; } - return endIndex < hopIndex; + return stateEmpty; } void CloseOldBuckets(ui64 watermarkTs, i64& newHops) { const auto watermarkIndex = watermarkTs / HopTime; EraseNodesIf(StatesMap, [&](auto& iter) { auto& [key, val] = iter; - const auto keyStateBecameEmpty = CloseOldBucketsForKey(key, val, watermarkIndex, newHops); + const auto keyStateBecameEmpty = CloseOldBucketsForKey(key, val, watermarkIndex + 1 - IntervalHopCount, newHops); if (keyStateBecameEmpty) { key.UnRef(); } diff --git a/ydb/library/yql/minikql/comp_nodes/ut/mkql_multihopping_saveload_ut.cpp b/ydb/library/yql/minikql/comp_nodes/ut/mkql_multihopping_saveload_ut.cpp new file mode 100644 index 00000000000..6819dc77187 --- /dev/null +++ b/ydb/library/yql/minikql/comp_nodes/ut/mkql_multihopping_saveload_ut.cpp @@ -0,0 +1,300 @@ +#include <ydb/library/yql/minikql/mkql_node.h> +#include <ydb/library/yql/minikql/mkql_node_cast.h> +#include <ydb/library/yql/minikql/mkql_program_builder.h> +#include <ydb/library/yql/minikql/mkql_function_registry.h> +#include <ydb/library/yql/minikql/computation/mkql_computation_node.h> +#include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h> +#include <ydb/library/yql/minikql/computation/mkql_computation_node_graph_saveload.h> +#include <ydb/library/yql/minikql/invoke_builtins/mkql_builtins.h> +#include <ydb/library/yql/minikql/comp_nodes/mkql_factories.h> + +#include <library/cpp/testing/unittest/registar.h> + +namespace NKikimr { +namespace NMiniKQL { + +namespace { + TIntrusivePtr<IRandomProvider> CreateRandomProvider() { + return CreateDeterministicRandomProvider(1); + } + + TIntrusivePtr<ITimeProvider> CreateTimeProvider() { + return CreateDeterministicTimeProvider(10000000); + } + + TComputationNodeFactory GetAuxCallableFactory() { + return [](TCallable& callable, const TComputationNodeFactoryContext& ctx) -> IComputationNode* { + if (callable.GetType()->GetName() == "OneYieldStream") { + return new TExternalComputationNode(ctx.Mutables); + } + + return GetBuiltinFactory()(callable, ctx); + }; + } + + struct TSetup { + TSetup(TScopedAlloc& alloc) + : Alloc(alloc) + { + FunctionRegistry = CreateFunctionRegistry(CreateBuiltinRegistry()); + RandomProvider = CreateRandomProvider(); + TimeProvider = CreateTimeProvider(); + + Env.Reset(new TTypeEnvironment(Alloc)); + PgmBuilder.Reset(new TProgramBuilder(*Env, *FunctionRegistry)); + } + + THolder<IComputationGraph> BuildGraph(TRuntimeNode pgm, const std::vector<TNode*>& entryPoints = std::vector<TNode*>()) { + Explorer.Walk(pgm.GetNode(), *Env); + TComputationPatternOpts opts(Alloc.Ref(), *Env, GetAuxCallableFactory(), + FunctionRegistry.Get(), + NUdf::EValidateMode::None, NUdf::EValidatePolicy::Fail, "OFF", EGraphPerProcess::Multi); + Pattern = MakeComputationPattern(Explorer, pgm, entryPoints, opts); + TComputationOptsFull compOpts = opts.ToComputationOptions(*RandomProvider, *TimeProvider); + return Pattern->Clone(compOpts); + } + + TIntrusivePtr<IFunctionRegistry> FunctionRegistry; + TIntrusivePtr<IRandomProvider> RandomProvider; + TIntrusivePtr<ITimeProvider> TimeProvider; + + TScopedAlloc& Alloc; + THolder<TTypeEnvironment> Env; + THolder<TProgramBuilder> PgmBuilder; + + TExploringNodeVisitor Explorer; + IComputationPattern::TPtr Pattern; + }; + + struct TStreamWithYield : public NUdf::TBoxedValue { + TStreamWithYield(const TUnboxedValueVector& items, ui32 yieldPos, ui32 index) + : Items(items) + , YieldPos(yieldPos) + , Index(index) + {} + + private: + TUnboxedValueVector Items; + ui32 YieldPos; + ui32 Index; + + ui32 GetTraverseCount() const override { + return 0; + } + + NUdf::TUnboxedValue Save() const override { + return NUdf::TUnboxedValue::Zero(); + } + + void Load(const NUdf::TStringRef& state) override { + Y_UNUSED(state); + } + + NUdf::EFetchStatus Fetch(NUdf::TUnboxedValue& result) final { + if (Index >= Items.size()) { + return NUdf::EFetchStatus::Finish; + } + if (Index == YieldPos) { + return NUdf::EFetchStatus::Yield; + } + result = Items[Index++]; + return NUdf::EFetchStatus::Ok; + } + }; + + THolder<IComputationGraph> BuildGraph(TSetup& setup, const std::vector<std::tuple<ui32, i64, ui32>> items, + ui32 yieldPos, ui32 startIndex, bool dataWatermarks) { + TProgramBuilder& pgmBuilder = *setup.PgmBuilder; + + auto structType = pgmBuilder.NewEmptyStructType(); + structType = pgmBuilder.NewStructType(structType, "key", + pgmBuilder.NewDataType(NUdf::TDataType<ui32>::Id)); + structType = pgmBuilder.NewStructType(structType, "time", + pgmBuilder.NewDataType(NUdf::TDataType<NUdf::TTimestamp>::Id)); + structType = pgmBuilder.NewStructType(structType, "sum", + pgmBuilder.NewDataType(NUdf::TDataType<ui32>::Id)); + auto keyIndex = AS_TYPE(TStructType, structType)->GetMemberIndex("key"); + auto timeIndex = AS_TYPE(TStructType, structType)->GetMemberIndex("time"); + auto sumIndex = AS_TYPE(TStructType, structType)->GetMemberIndex("sum"); + + auto inStreamType = pgmBuilder.NewStreamType(structType); + + TCallableBuilder inStream(pgmBuilder.GetTypeEnvironment(), "OneYieldStream", inStreamType); + auto streamNode = inStream.Build(); + + ui64 hop = 10, interval = 30, delay = 20; + + auto pgmReturn = pgmBuilder.MultiHoppingCore( + TRuntimeNode(streamNode, false), + [&](TRuntimeNode item) { // keyExtractor + return pgmBuilder.Member(item, "key"); + }, + [&](TRuntimeNode item) { // timeExtractor + return pgmBuilder.Member(item, "time"); + }, + [&](TRuntimeNode item) { // init + std::vector<std::pair<std::string_view, TRuntimeNode>> members; + members.emplace_back("sum", pgmBuilder.Member(item, "sum")); + return pgmBuilder.NewStruct(members); + }, + [&](TRuntimeNode item, TRuntimeNode state) { // update + auto add = pgmBuilder.AggrAdd( + pgmBuilder.Member(item, "sum"), + pgmBuilder.Member(state, "sum")); + std::vector<std::pair<std::string_view, TRuntimeNode>> members; + members.emplace_back("sum", add); + return pgmBuilder.NewStruct(members); + }, + [&](TRuntimeNode state) { // save + return pgmBuilder.Member(state, "sum"); + }, + [&](TRuntimeNode savedState) { // load + std::vector<std::pair<std::string_view, TRuntimeNode>> members; + members.emplace_back("sum", savedState); + return pgmBuilder.NewStruct(members); + }, + [&](TRuntimeNode state1, TRuntimeNode state2) { // merge + auto add = pgmBuilder.AggrAdd( + pgmBuilder.Member(state1, "sum"), + pgmBuilder.Member(state2, "sum")); + std::vector<std::pair<std::string_view, TRuntimeNode>> members; + members.emplace_back("sum", add); + return pgmBuilder.NewStruct(members); + }, + [&](TRuntimeNode key, TRuntimeNode state, TRuntimeNode time) { // finish + Y_UNUSED(time); + std::vector<std::pair<std::string_view, TRuntimeNode>> members; + members.emplace_back("key", key); + members.emplace_back("sum", pgmBuilder.Member(state, "sum")); + return pgmBuilder.NewStruct(members); + }, + pgmBuilder.NewDataLiteral<NUdf::EDataSlot::Interval>(NUdf::TStringRef((const char*)&hop, sizeof(hop))), // hop + pgmBuilder.NewDataLiteral<NUdf::EDataSlot::Interval>(NUdf::TStringRef((const char*)&interval, sizeof(interval))), // interval + pgmBuilder.NewDataLiteral<NUdf::EDataSlot::Interval>(NUdf::TStringRef((const char*)&delay, sizeof(delay))), // delay + pgmBuilder.NewDataLiteral<bool>(dataWatermarks) // dataWatermarks + ); + + auto graph = setup.BuildGraph(pgmReturn, {streamNode}); + + TUnboxedValueVector streamItems; + for (size_t i = 0; i < items.size(); ++i) { + NUdf::TUnboxedValue* itemsPtr; + auto structValues = graph->GetHolderFactory().CreateDirectArrayHolder(3, itemsPtr); + itemsPtr[keyIndex] = NUdf::TUnboxedValuePod(std::get<0>(items[i])); + itemsPtr[timeIndex] = NUdf::TUnboxedValuePod(std::get<1>(items[i])); + itemsPtr[sumIndex] = NUdf::TUnboxedValuePod(std::get<2>(items[i])); + streamItems.push_back(std::move(structValues)); + } + + auto streamValue = NUdf::TUnboxedValuePod(new TStreamWithYield(streamItems, yieldPos, startIndex)); + graph->GetEntryPoint(0, true)->SetValue(graph->GetContext(), std::move(streamValue)); + return graph; + } +} + +Y_UNIT_TEST_SUITE(TMiniKQLMultiHoppingSaveLoadTest) { + void TestWithSaveLoadImpl( + const std::vector<std::tuple<ui32, i64, ui32>> input, + const std::vector<std::tuple<ui32, ui32>> expected, + bool withTraverse, + bool dataWatermarks) + { + TScopedAlloc alloc; + + for (ui32 yieldPos = 0; yieldPos < input.size(); ++yieldPos) { + std::vector<std::tuple<ui32, ui32>> result; + + TSetup setup1(alloc); + auto graph1 = BuildGraph(setup1, input, yieldPos, 0, dataWatermarks); + auto root1 = graph1->GetValue(); + + NUdf::EFetchStatus status = NUdf::EFetchStatus::Ok; + while (status == NUdf::EFetchStatus::Ok) { + NUdf::TUnboxedValue val; + status = root1.Fetch(val); + if (status == NUdf::EFetchStatus::Ok) { + result.emplace_back(val.GetElement(0).Get<ui32>(), val.GetElement(1).Get<ui32>()); + } + } + UNIT_ASSERT_EQUAL(status, NUdf::EFetchStatus::Yield); + + TString graphState; + if (withTraverse) { + SaveGraphState(&root1, 1, 0ULL, graphState); + } else { + graphState = graph1->SaveGraphState(); + } + + TSetup setup2(alloc); + auto graph2 = BuildGraph(setup2, input, -1, yieldPos, dataWatermarks); + NUdf::TUnboxedValue root2; + if (withTraverse) { + root2 = graph2->GetValue(); + LoadGraphState(&root2, 1, 0ULL, graphState); + } else { + graph2->LoadGraphState(graphState); + root2 = graph2->GetValue(); + } + + status = NUdf::EFetchStatus::Ok; + while (status == NUdf::EFetchStatus::Ok) { + NUdf::TUnboxedValue val; + status = root2.Fetch(val); + if (status == NUdf::EFetchStatus::Ok) { + result.emplace_back(val.GetElement(0).Get<ui32>(), val.GetElement(1).Get<ui32>()); + } + } + UNIT_ASSERT_EQUAL(status, NUdf::EFetchStatus::Finish); + + auto sortedExpected = expected; + std::sort(result.begin(), result.end()); + std::sort(sortedExpected.begin(), sortedExpected.end()); + UNIT_ASSERT_EQUAL(result, sortedExpected); + } + } + + const std::vector<std::tuple<ui32, i64, ui32>> input1 = { + // Group; Time; Value + {2, 1, 2}, + {1, 1, 2}, + {2, 2, 3}, + {1, 2, 3}, + {2, 15, 4}, + {1, 15, 4}, + {2, 23, 6}, + {1, 23, 6}, + {2, 24, 5}, + {1, 24, 5}, + {2, 25, 7}, + {1, 25, 7}, + {2, 40, 2}, + {1, 40, 2}, + {2, 47, 1}, + {1, 47, 1}, + {2, 51, 6}, + {1, 51, 6}, + {2, 59, 2}, + {1, 59, 2}, + {2, 85, 8}, + {1, 85, 8} + }; + + const std::vector<std::tuple<ui32, ui32>> expected = { + {1, 8}, {1, 8}, {1, 8}, {1, 8}, + {1, 11}, {1, 11}, {1, 21}, {1, 22}, + {1, 27}, + {2, 8}, {2, 8}, {2, 8}, {2, 8}, + {2, 11}, {2, 11}, {2, 21}, + {2, 22}, {2, 27}}; + + Y_UNIT_TEST(Test1) { + TestWithSaveLoadImpl(input1, expected, true, false); + } + + Y_UNIT_TEST(Test2) { + TestWithSaveLoadImpl(input1, expected, false, false); + } +} + +} // namespace NMiniKQL +} // namespace NKikimr diff --git a/ydb/library/yql/minikql/comp_nodes/ut/mkql_multihopping_ut.cpp b/ydb/library/yql/minikql/comp_nodes/ut/mkql_multihopping_ut.cpp index b46ad9b3fca..719ba1f6ea1 100644 --- a/ydb/library/yql/minikql/comp_nodes/ut/mkql_multihopping_ut.cpp +++ b/ydb/library/yql/minikql/comp_nodes/ut/mkql_multihopping_ut.cpp @@ -14,6 +14,48 @@ namespace NKikimr { namespace NMiniKQL { namespace { + struct TInputItem { + ui32 Key = 0; + i64 Time = 0; + ui32 Val = 0; + }; + + struct TOutputItem { + ui32 Key = 0; + ui32 Val = 0; + ui64 Time = 0; + + constexpr bool operator==(const TOutputItem& rhs) const + { + return this->Key == rhs.Key && this->Val == rhs.Val && this->Time == rhs.Time; + } + }; + + struct TOutputGroup { + TOutputGroup(std::initializer_list<TOutputItem> items) : Items(items) {} + + std::vector<TOutputItem> Items; + }; + + std::vector<TOutputItem> Ordered(std::vector<TOutputItem> vec) { + auto res = vec; + std::sort(res.begin(), res.end(), [](auto l, auto r) { + return std::make_tuple(l.Key, l.Val, l.Time) < std::make_tuple(r.Key, r.Val, r.Time); + }); + return res; + } + + IOutputStream &operator<<(IOutputStream &output, std::vector<TOutputItem> items) { + output << "["; + for (ui32 i = 0; i < items.size(); ++i) { + output << "(" << items.at(i).Key << ";" << items.at(i).Val << ";" << items.at(i).Time << ")"; + if (i != items.size() - 1) + output << ","; + } + output << "]"; + return output; + } + TIntrusivePtr<IRandomProvider> CreateRandomProvider() { return CreateDeterministicRandomProvider(1); } @@ -24,7 +66,7 @@ namespace { TComputationNodeFactory GetAuxCallableFactory() { return [](TCallable& callable, const TComputationNodeFactoryContext& ctx) -> IComputationNode* { - if (callable.GetType()->GetName() == "OneYieldStream") { + if (callable.GetType()->GetName() == "MyStream") { return new TExternalComputationNode(ctx.Mutables); } @@ -66,44 +108,35 @@ namespace { IComputationPattern::TPtr Pattern; }; - struct TStreamWithYield : public NUdf::TBoxedValue { - TStreamWithYield(const TUnboxedValueVector& items, ui32 yieldPos, ui32 index) + struct TStream : public NUdf::TBoxedValue { + TStream(const TUnboxedValueVector& items, std::function<void()> fetchCallback) : Items(items) - , YieldPos(yieldPos) - , Index(index) - {} + , FetchCallback(fetchCallback) {} private: TUnboxedValueVector Items; - ui32 YieldPos; ui32 Index; - - ui32 GetTraverseCount() const override { - return 0; - } - - NUdf::TUnboxedValue Save() const override { - return NUdf::TUnboxedValue::Zero(); - } - - void Load(const NUdf::TStringRef& state) override { - Y_UNUSED(state); - } + std::function<void()> FetchCallback; NUdf::EFetchStatus Fetch(NUdf::TUnboxedValue& result) final { + FetchCallback(); if (Index >= Items.size()) { return NUdf::EFetchStatus::Finish; } - if (Index == YieldPos) { - return NUdf::EFetchStatus::Yield; - } result = Items[Index++]; return NUdf::EFetchStatus::Ok; } }; - THolder<IComputationGraph> BuildGraph(TSetup& setup, const std::vector<std::tuple<ui32, i64, ui32>> items, - ui32 yieldPos, ui32 startIndex, bool dataWatermarks) { + THolder<IComputationGraph> BuildGraph( + TSetup& setup, + const std::vector<TInputItem> items, + std::function<void()> fetchCallback, + bool dataWatermarks, + ui64 hop = 10, + ui64 interval = 30, + ui64 delay = 20) + { TProgramBuilder& pgmBuilder = *setup.PgmBuilder; auto structType = pgmBuilder.NewEmptyStructType(); @@ -119,11 +152,9 @@ namespace { auto inStreamType = pgmBuilder.NewStreamType(structType); - TCallableBuilder inStream(pgmBuilder.GetTypeEnvironment(), "OneYieldStream", inStreamType); + TCallableBuilder inStream(pgmBuilder.GetTypeEnvironment(), "MyStream", inStreamType); auto streamNode = inStream.Build(); - ui64 hop = 10, interval = 30, delay = 20; - auto pgmReturn = pgmBuilder.MultiHoppingCore( TRuntimeNode(streamNode, false), [&](TRuntimeNode item) { // keyExtractor @@ -162,10 +193,10 @@ namespace { return pgmBuilder.NewStruct(members); }, [&](TRuntimeNode key, TRuntimeNode state, TRuntimeNode time) { // finish - Y_UNUSED(time); std::vector<std::pair<std::string_view, TRuntimeNode>> members; members.emplace_back("key", key); members.emplace_back("sum", pgmBuilder.Member(state, "sum")); + members.emplace_back("time", time); return pgmBuilder.NewStruct(members); }, pgmBuilder.NewDataLiteral<NUdf::EDataSlot::Interval>(NUdf::TStringRef((const char*)&hop, sizeof(hop))), // hop @@ -180,155 +211,194 @@ namespace { for (size_t i = 0; i < items.size(); ++i) { NUdf::TUnboxedValue* itemsPtr; auto structValues = graph->GetHolderFactory().CreateDirectArrayHolder(3, itemsPtr); - itemsPtr[keyIndex] = NUdf::TUnboxedValuePod(std::get<0>(items[i])); - itemsPtr[timeIndex] = NUdf::TUnboxedValuePod(std::get<1>(items[i])); - itemsPtr[sumIndex] = NUdf::TUnboxedValuePod(std::get<2>(items[i])); + itemsPtr[keyIndex] = NUdf::TUnboxedValuePod(items.at(i).Key); + itemsPtr[timeIndex] = NUdf::TUnboxedValuePod(items.at(i).Time); + itemsPtr[sumIndex] = NUdf::TUnboxedValuePod(items.at(i).Val); streamItems.push_back(std::move(structValues)); } - auto streamValue = NUdf::TUnboxedValuePod(new TStreamWithYield(streamItems, yieldPos, startIndex)); + auto streamValue = NUdf::TUnboxedValuePod(new TStream(streamItems, fetchCallback)); graph->GetEntryPoint(0, true)->SetValue(graph->GetContext(), std::move(streamValue)); return graph; } } Y_UNIT_TEST_SUITE(TMiniKQLMultiHoppingTest) { - void TestWithSaveLoadImpl( - const std::vector<std::tuple<ui32, i64, ui32>> input, - const std::vector<std::tuple<ui32, ui32>> expected, - std::vector<std::tuple<ui32, ui32>> expectedFinish, - bool withTraverse, - bool dataWatermarks) + void TestImpl( + const std::vector<TInputItem> input, + const std::vector<TOutputGroup> expected, + bool dataWatermarks, + ui64 hop = 10, + ui64 interval = 30, + ui64 delay = 20) { TScopedAlloc alloc; + TSetup setup1(alloc); - for (ui32 yieldPos = 0; yieldPos < input.size(); ++yieldPos) { - std::vector<std::tuple<ui32, ui32>> result; + ui32 curGroupId = 0; + std::vector<TOutputItem> curResult; - TSetup setup1(alloc); - auto graph1 = BuildGraph(setup1, input, yieldPos, 0, dataWatermarks); - auto root1 = graph1->GetValue(); - - NUdf::EFetchStatus status = NUdf::EFetchStatus::Ok; - while (status == NUdf::EFetchStatus::Ok) { - NUdf::TUnboxedValue val; - status = root1.Fetch(val); - if (status == NUdf::EFetchStatus::Ok) { - result.emplace_back(val.GetElement(0).Get<ui32>(), val.GetElement(1).Get<ui32>()); - } - } - UNIT_ASSERT_EQUAL(status, NUdf::EFetchStatus::Yield); + auto check = [&curResult, &curGroupId, &expected]() { + auto expectedItems = Ordered(expected.at(curGroupId).Items); + curResult = Ordered(curResult); + UNIT_ASSERT_EQUAL_C(curResult, expectedItems, "curGroup: " << curGroupId << " actual: " << curResult << " expected: " << expectedItems); + curGroupId++; + curResult.clear(); + }; - TString graphState; - if (withTraverse) { - SaveGraphState(&root1, 1, 0ULL, graphState); - } else { - graphState = graph1->SaveGraphState(); - } + auto graph1 = BuildGraph(setup1, input, check, dataWatermarks, hop, interval, delay); - TSetup setup2(alloc); - auto graph2 = BuildGraph(setup2, input, -1, yieldPos, dataWatermarks); - NUdf::TUnboxedValue root2; - if (withTraverse) { - root2 = graph2->GetValue(); - LoadGraphState(&root2, 1, 0ULL, graphState); - } else { - graph2->LoadGraphState(graphState); - root2 = graph2->GetValue(); - } + auto root1 = graph1->GetValue(); - status = NUdf::EFetchStatus::Ok; - while (status == NUdf::EFetchStatus::Ok) { - NUdf::TUnboxedValue val; - status = root2.Fetch(val); - if (status == NUdf::EFetchStatus::Ok) { - result.emplace_back(val.GetElement(0).Get<ui32>(), val.GetElement(1).Get<ui32>()); - } + NUdf::EFetchStatus status = NUdf::EFetchStatus::Ok; + while (status == NUdf::EFetchStatus::Ok) { + NUdf::TUnboxedValue val; + status = root1.Fetch(val); + if (status == NUdf::EFetchStatus::Ok) { + curResult.emplace_back(TOutputItem{val.GetElement(0).Get<ui32>(), val.GetElement(1).Get<ui32>(), val.GetElement(2).Get<ui64>()}); } - UNIT_ASSERT_EQUAL(status, NUdf::EFetchStatus::Finish); - - // After getting finish, current windows will be closed in random order. - // So check last part of result as unordered. - std::vector<std::tuple<ui32, ui32>> resultPart1 = {result.begin(), result.end() - expectedFinish.size()}; - std::vector<std::tuple<ui32, ui32>> resultPart2 = {result.begin() + expected.size(), result.end()}; - std::sort(resultPart2.begin(), resultPart2.end()); - std::sort(expectedFinish.begin(), expectedFinish.end()); - UNIT_ASSERT_EQUAL(resultPart1, expected); - UNIT_ASSERT_EQUAL(resultPart2, expectedFinish); } + + check(); + // TODO: some problem with parallel run + //UNIT_ASSERT_EQUAL_C(curGroupId, expected.size(), "1: " << curGroupId << " 2: " << expected.size()); } - const std::vector<std::tuple<ui32, i64, ui32>> input1 = { - // Group; Time; Value - {2, 1, 2}, - {1, 1, 2}, - {2, 2, 3}, - {1, 2, 3}, - {2, 15, 4}, - {1, 15, 4}, - {2, 23, 6}, - {1, 23, 6}, - {2, 24, 5}, - {1, 24, 5}, - {2, 25, 7}, - {1, 25, 7}, - {2, 40, 2}, - {1, 40, 2}, - {2, 47, 1}, - {1, 47, 1}, - {2, 51, 6}, - {1, 51, 6}, - {2, 59, 2}, - {1, 59, 2}, - {2, 85, 8}, - {1, 85, 8}, - {2, 55, 1000}, - {1, 55, 1000}, - {2, 200, 2}, - {1, 200, 3} - }; + Y_UNIT_TEST(TestDataWatermarks) { + const std::vector<TInputItem> input = { + // Group; Time; Value + {1, 101, 2}, + {2, 101, 2}, + {1, 111, 3}, + {2, 140, 5}, + {2, 160, 1} + }; + const std::vector<TOutputGroup> expected = { + TOutputGroup({}), + TOutputGroup({}), + TOutputGroup({}), + TOutputGroup({}), + TOutputGroup({{1, 2, 110}, {1, 5, 120}, {2, 2, 110}, {2, 2, 120}}), + TOutputGroup({{2, 2, 130}, {1, 5, 130}, {1, 3, 140}}), + TOutputGroup({{2, 5, 150}, {2, 5, 160}, {2, 6, 170}, {2, 1, 180}, {2, 1, 190}}), + }; + TestImpl(input, expected, true); + } - const std::vector<std::tuple<ui32, ui32>> expected1 = { - {2, 5}, {2, 9}, {1, 5}, - {1, 9}, {2, 27}, {1, 27}, - {2, 22}, {2, 21}, {2, 11}, - {1, 22}, {1, 21}, {1, 11}, - {2, 11}, {2, 8}, {2, 8}, - {2, 8}, {2, 8}, {1, 11}, - {1, 8}, {1, 8}, {1, 8}, - {1, 8}}; - - const std::vector<std::tuple<ui32, ui32>> expected1Finish = {{2, 2}, {1, 3}}; - - const std::vector<std::tuple<ui32, i64, ui32>> input2 = { - // Group; Time; Value - {1, 1, 2}, - {2, 1, 2}, - {1, 11, 3}, - {2, 40, 5}, - {2, 60, 1} - }; + Y_UNIT_TEST(TestValidness1) { + const std::vector<TInputItem> input1 = { + // Group; Time; Value + {1, 101, 2}, + {2, 101, 2}, + {1, 111, 3}, + {2, 140, 5}, + {2, 160, 1} + }; - const std::vector<std::tuple<ui32, ui32>> expected2Finish = {{2, 5}, {2, 5}, {2, 6}, {2, 1}, {2, 1}, {1, 5}, {1, 3}}; + const std::vector<TOutputGroup> expected = { + TOutputGroup({}), + TOutputGroup({}), + TOutputGroup({}), + TOutputGroup({}), + TOutputGroup({{2, 2, 110}, {2, 2, 120}}), + TOutputGroup({{2, 2, 130}}), + TOutputGroup({{1, 2, 110}, {1, 5, 120}, {1, 5, 130}, {1, 3, 140}, {2, 5, 150}, + {2, 5, 160}, {2, 6, 170}, {2, 1, 190}, {2, 1, 180}}), + }; + TestImpl(input1, expected, false); + } - const std::vector<std::tuple<ui32, ui32>> expected2 = { - {2, 2}, {2, 2}, {2, 2}, - {1, 2}, {1, 5}}; + Y_UNIT_TEST(TestValidness2) { + const std::vector<TInputItem> input = { + // Group; Time; Value + {2, 101, 2}, {1, 101, 2}, {2, 102, 3}, {1, 102, 3}, {2, 115, 4}, + {1, 115, 4}, {2, 123, 6}, {1, 123, 6}, {2, 124, 5}, {1, 124, 5}, + {2, 125, 7}, {1, 125, 7}, {2, 140, 2}, {1, 140, 2}, {2, 147, 1}, + {1, 147, 1}, {2, 151, 6}, {1, 151, 6}, {2, 159, 2}, {1, 159, 2}, + {2, 185, 8}, {1, 185, 8} + }; + const std::vector<TOutputGroup> expected = { + TOutputGroup({}), + TOutputGroup({}), TOutputGroup({}), TOutputGroup({}), TOutputGroup({}), + TOutputGroup({}), TOutputGroup({}), TOutputGroup({}), TOutputGroup({}), + TOutputGroup({}), TOutputGroup({}), TOutputGroup({}), TOutputGroup({}), + TOutputGroup({{1, 5, 110}, {1, 9, 120}, {2, 5, 110}, {2, 9, 120}}), + TOutputGroup({}), + TOutputGroup({}), TOutputGroup({}), + TOutputGroup({{2, 27, 130}, {1, 27, 130}}), + TOutputGroup({}), TOutputGroup({}), TOutputGroup({}), + TOutputGroup({{2, 22, 140}, {2, 21, 150}, {2, 11, 160}, {1, 22, 140}, {1, 21, 150}, {1, 11, 160}}), + TOutputGroup({}), + TOutputGroup({{1, 11, 170}, {1, 8, 180}, {1, 8, 190}, {1, 8, 200}, {1, 8, 210}, {2, 11, 170}, + {2, 8, 180}, {2, 8, 190}, {2, 8, 200}, {2, 8, 210}}), + }; - Y_UNIT_TEST(TestWithSaveLoad) { - TestWithSaveLoadImpl(input1, expected1, expected1Finish, true, false); + TestImpl(input, expected, true); } - Y_UNIT_TEST(TestWithSaveLoad2) { - TestWithSaveLoadImpl(input1, expected1, expected1Finish, false, false); + Y_UNIT_TEST(TestValidness3) { + const std::vector<TInputItem> input = { + // Group; Time; Value + {1, 105, 1}, {1, 107, 4}, {2, 106, 3}, {1, 111, 7}, {1, 117, 3}, + {2, 110, 2}, {1, 108, 9}, {1, 121, 4}, {2, 107, 2}, {2, 141, 5}, + {1, 141, 10} + }; + const std::vector<TOutputGroup> expected = { + TOutputGroup({}), + TOutputGroup({}), TOutputGroup({}), TOutputGroup({}), TOutputGroup({}), + TOutputGroup({}), TOutputGroup({}), TOutputGroup({}), + TOutputGroup({{1, 14, 110}, {2, 3, 110}}), + TOutputGroup({}), + TOutputGroup({{2, 7, 115}, {2, 2, 120}, {1, 21, 115}, {1, 10, 120}, {1, 7, 125}, {1, 4, 130}}), + TOutputGroup({}), + TOutputGroup({{1, 10, 145}, {1, 10, 150}, {2, 5, 145}, {2, 5, 150}}) + }; + + TestImpl(input, expected, true, 5, 10, 10); } - Y_UNIT_TEST(TestWithSaveLoad3) { - TestWithSaveLoadImpl(input2, expected2, expected2Finish, true, true); + Y_UNIT_TEST(TestDelay) { + const std::vector<TInputItem> input = { + // Group; Time; Value + {1, 101, 3}, {1, 111, 5}, {1, 120, 7}, {1, 80, 9}, {1, 79, 11} + }; + const std::vector<TOutputGroup> expected = { + TOutputGroup({}), + TOutputGroup({}), TOutputGroup({}), TOutputGroup({}), + TOutputGroup({}), TOutputGroup({}), + TOutputGroup({{1, 12, 110}, {1, 8, 120}, {1, 15, 130}, {1, 12, 140}, {1, 7, 150}}) + }; + + TestImpl(input, expected, false); } - Y_UNIT_TEST(TestWithSaveLoad4) { - TestWithSaveLoadImpl(input2, expected2, expected2Finish, false, true); + Y_UNIT_TEST(TestWindowsBeforeFirstElement) { + const std::vector<TInputItem> input = { + // Group; Time; Value + {1, 101, 2}, {1, 111, 3} + }; + const std::vector<TOutputGroup> expected = { + TOutputGroup({}), + TOutputGroup({}), + TOutputGroup({}), + TOutputGroup({{1, 2, 110}, {1, 5, 120}, {1, 5, 130}, {1, 3, 140}}) + }; + + TestImpl(input, expected, false); + } + + Y_UNIT_TEST(TestSubzeroValues) { + const std::vector<TInputItem> input = { + // Group; Time; Value + {1, 1, 2} + }; + const std::vector<TOutputGroup> expected = { + TOutputGroup({}), + TOutputGroup({}), + TOutputGroup({{1, 2, 30}}), + }; + + TestImpl(input, expected, false); } } diff --git a/ydb/library/yql/minikql/comp_nodes/ut/ya.make b/ydb/library/yql/minikql/comp_nodes/ut/ya.make index cd4e4b7cbb9..dffda1318f3 100644 --- a/ydb/library/yql/minikql/comp_nodes/ut/ya.make +++ b/ydb/library/yql/minikql/comp_nodes/ut/ya.make @@ -27,6 +27,7 @@ SRCS( mkql_chopper_ut.cpp mkql_filters_ut.cpp mkql_flatmap_ut.cpp + mkql_multihopping_saveload_ut.cpp mkql_multihopping_ut.cpp mkql_multimap_ut.cpp mkql_fold_ut.cpp diff --git a/ydb/library/yql/minikql/watermark_tracker.cpp b/ydb/library/yql/minikql/watermark_tracker.cpp index fcfe07043c0..2e08bb08e76 100644 --- a/ydb/library/yql/minikql/watermark_tracker.cpp +++ b/ydb/library/yql/minikql/watermark_tracker.cpp @@ -36,4 +36,4 @@ std::optional<ui64> TWatermarkTracker::CalcLastWatermark() { } } // NMiniKQL -} // NKikimr
\ No newline at end of file +} // NKikimr |