diff options
| author | vvvv <[email protected]> | 2025-10-09 12:25:18 +0300 |
|---|---|---|
| committer | vvvv <[email protected]> | 2025-10-09 12:57:17 +0300 |
| commit | cb77d014972b2cdb27d2e6d979fc3a2772b27ad4 (patch) | |
| tree | 7f3bcd8ce71c6bd0f3ccc11e31b9f665475b819e /yql/essentials/minikql/computation/mkql_computation_node_graph_saveload_ut.cpp | |
| parent | d58a8990d353b051c27e1069141117fdfde64358 (diff) | |
YQL-20086 minikql
commit_hash:e96f7390db5fcbe7e9f64f898141a263ad522daa
Diffstat (limited to 'yql/essentials/minikql/computation/mkql_computation_node_graph_saveload_ut.cpp')
| -rw-r--r-- | yql/essentials/minikql/computation/mkql_computation_node_graph_saveload_ut.cpp | 632 |
1 files changed, 316 insertions, 316 deletions
diff --git a/yql/essentials/minikql/computation/mkql_computation_node_graph_saveload_ut.cpp b/yql/essentials/minikql/computation/mkql_computation_node_graph_saveload_ut.cpp index 1527d598a31..caffdd5e5ab 100644 --- a/yql/essentials/minikql/computation/mkql_computation_node_graph_saveload_ut.cpp +++ b/yql/essentials/minikql/computation/mkql_computation_node_graph_saveload_ut.cpp @@ -14,368 +14,368 @@ namespace NKikimr { namespace NMiniKQL { namespace { - TIntrusivePtr<IRandomProvider> CreateRandomProvider() { - return CreateDeterministicRandomProvider(1); - } +TIntrusivePtr<IRandomProvider> CreateRandomProvider() { + return CreateDeterministicRandomProvider(1); +} - TIntrusivePtr<ITimeProvider> CreateTimeProvider() { - return CreateDeterministicTimeProvider(10000000); - } +TIntrusivePtr<ITimeProvider> CreateTimeProvider() { + return CreateDeterministicTimeProvider(10000000); +} - TComputationNodeFactory GetAuxCallableFactory() { - return [](TCallable& callable, const TComputationNodeFactoryContext& ctx) -> IComputationNode* { - if (callable.GetType()->GetName() == "OneYieldStream") { - return new TExternalComputationNode(ctx.Mutables); - } +TComputationNodeFactory GetAuxCallableFactory() { + return [](TCallable& callable, const TComputationNodeFactoryContext& ctx) -> IComputationNode* { + if (callable.GetType()->GetName() == "OneYieldStream") { + return new TExternalComputationNode(ctx.Mutables); + } - return GetBuiltinFactory()(callable, ctx); - }; + return GetBuiltinFactory()(callable, ctx); + }; +} + +struct TSetup { + TSetup(TScopedAlloc& alloc) + : Alloc(alloc) + { + FunctionRegistry = CreateFunctionRegistry(CreateBuiltinRegistry()); + RandomProvider = CreateRandomProvider(); + TimeProvider = CreateTimeProvider(); + + Env.Reset(new TTypeEnvironment(Alloc)); + PgmBuilder.Reset(new TProgramBuilder(*Env, *FunctionRegistry)); } - struct TSetup { - TSetup(TScopedAlloc& alloc) - : Alloc(alloc) - { - FunctionRegistry = CreateFunctionRegistry(CreateBuiltinRegistry()); - RandomProvider = CreateRandomProvider(); - TimeProvider = CreateTimeProvider(); + THolder<IComputationGraph> BuildGraph(TRuntimeNode pgm, const std::vector<TNode*>& entryPoints = std::vector<TNode*>()) { + Explorer.Walk(pgm.GetNode(), *Env); + TComputationPatternOpts opts(Alloc.Ref(), *Env, GetAuxCallableFactory(), + FunctionRegistry.Get(), + NUdf::EValidateMode::None, NUdf::EValidatePolicy::Fail, "OFF", EGraphPerProcess::Multi); + Pattern = MakeComputationPattern(Explorer, pgm, entryPoints, opts); + TComputationOptsFull compOpts = opts.ToComputationOptions(*RandomProvider, *TimeProvider); + return Pattern->Clone(compOpts); + } - Env.Reset(new TTypeEnvironment(Alloc)); - PgmBuilder.Reset(new TProgramBuilder(*Env, *FunctionRegistry)); - } + TIntrusivePtr<IFunctionRegistry> FunctionRegistry; + TIntrusivePtr<IRandomProvider> RandomProvider; + TIntrusivePtr<ITimeProvider> TimeProvider; - THolder<IComputationGraph> BuildGraph(TRuntimeNode pgm, const std::vector<TNode*>& entryPoints = std::vector<TNode*>()) { - Explorer.Walk(pgm.GetNode(), *Env); - TComputationPatternOpts opts(Alloc.Ref(), *Env, GetAuxCallableFactory(), - FunctionRegistry.Get(), - NUdf::EValidateMode::None, NUdf::EValidatePolicy::Fail, "OFF", EGraphPerProcess::Multi); - Pattern = MakeComputationPattern(Explorer, pgm, entryPoints, opts); - TComputationOptsFull compOpts = opts.ToComputationOptions(*RandomProvider, *TimeProvider); - return Pattern->Clone(compOpts); - } + TScopedAlloc& Alloc; + THolder<TTypeEnvironment> Env; + THolder<TProgramBuilder> PgmBuilder; - TIntrusivePtr<IFunctionRegistry> FunctionRegistry; - TIntrusivePtr<IRandomProvider> RandomProvider; - TIntrusivePtr<ITimeProvider> TimeProvider; + TExploringNodeVisitor Explorer; + IComputationPattern::TPtr Pattern; +}; - TScopedAlloc& Alloc; - THolder<TTypeEnvironment> Env; - THolder<TProgramBuilder> PgmBuilder; +struct TStreamWithYield: public NUdf::TBoxedValue { + TStreamWithYield(const TUnboxedValueVector& items, ui32 yieldPos, ui32 index) + : Items_(items) + , YieldPos_(yieldPos) + , Index_(index) + { + } - TExploringNodeVisitor Explorer; - IComputationPattern::TPtr Pattern; - }; +private: + TUnboxedValueVector Items_; + ui32 YieldPos_; + ui32 Index_; - struct TStreamWithYield : public NUdf::TBoxedValue { - TStreamWithYield(const TUnboxedValueVector& items, ui32 yieldPos, ui32 index) - : Items_(items) - , YieldPos_(yieldPos) - , Index_(index) - {} + ui32 GetTraverseCount() const override { + return 0; + } - private: - TUnboxedValueVector Items_; - ui32 YieldPos_; - ui32 Index_; + NUdf::TUnboxedValue Save() const override { + return NUdf::TUnboxedValue::Zero(); + } - ui32 GetTraverseCount() const override { - return 0; - } + bool Load2(const NUdf::TUnboxedValue& state) override { + Y_UNUSED(state); + return false; + } - NUdf::TUnboxedValue Save() const override { - return NUdf::TUnboxedValue::Zero(); + NUdf::EFetchStatus Fetch(NUdf::TUnboxedValue& result) final { + if (Index_ >= Items_.size()) { + return NUdf::EFetchStatus::Finish; } - - bool Load2(const NUdf::TUnboxedValue& state) override { - Y_UNUSED(state); - return false; + if (Index_ == YieldPos_) { + return NUdf::EFetchStatus::Yield; } + result = Items_[Index_++]; + return NUdf::EFetchStatus::Ok; + } +}; +} // namespace - NUdf::EFetchStatus Fetch(NUdf::TUnboxedValue& result) final { - if (Index_ >= Items_.size()) { - return NUdf::EFetchStatus::Finish; - } - if (Index_ == YieldPos_) { - return NUdf::EFetchStatus::Yield; - } - result = Items_[Index_++]; - return NUdf::EFetchStatus::Ok; +Y_UNIT_TEST_SUITE(TMiniKQLSaveLoadTest) { +Y_UNIT_TEST(TestSqueezeSaveLoad) { + TScopedAlloc alloc(__LOCATION__); + + const std::vector<ui32> items = {2, 3, 4, 5, 6, 7, 8}; + + auto buildGraph = [&items](TSetup& setup, ui32 yieldPos, ui32 startIndex) -> THolder<IComputationGraph> { + TProgramBuilder& pgmBuilder = *setup.PgmBuilder; + + auto dataType = pgmBuilder.NewDataType(NUdf::TDataType<ui32>::Id); + auto streamType = pgmBuilder.NewStreamType(dataType); + + TCallableBuilder inStream(pgmBuilder.GetTypeEnvironment(), "OneYieldStream", streamType); + auto streamNode = inStream.Build(); + + auto pgmReturn = pgmBuilder.Squeeze( + TRuntimeNode(streamNode, false), + pgmBuilder.NewDataLiteral<ui32>(1), + [&](TRuntimeNode item, TRuntimeNode state) { + return pgmBuilder.Add(item, state); + }, + [](TRuntimeNode state) { + return state; + }, + [](TRuntimeNode state) { + return state; + }); + + TUnboxedValueVector streamItems; + for (auto item : items) { + streamItems.push_back(NUdf::TUnboxedValuePod(item)); } - }; -} -Y_UNIT_TEST_SUITE(TMiniKQLSaveLoadTest) { - Y_UNIT_TEST(TestSqueezeSaveLoad) { - TScopedAlloc alloc(__LOCATION__); - - const std::vector<ui32> items = {2, 3, 4, 5, 6, 7, 8}; - - auto buildGraph = [&items] (TSetup& setup, ui32 yieldPos, ui32 startIndex) -> THolder<IComputationGraph> { - TProgramBuilder& pgmBuilder = *setup.PgmBuilder; - - auto dataType = pgmBuilder.NewDataType(NUdf::TDataType<ui32>::Id); - auto streamType = pgmBuilder.NewStreamType(dataType); - - TCallableBuilder inStream(pgmBuilder.GetTypeEnvironment(), "OneYieldStream", streamType); - auto streamNode = inStream.Build(); - - auto pgmReturn = pgmBuilder.Squeeze( - TRuntimeNode(streamNode, false), - pgmBuilder.NewDataLiteral<ui32>(1), - [&](TRuntimeNode item, TRuntimeNode state) { - return pgmBuilder.Add(item, state); - }, - [](TRuntimeNode state) { - return state; - }, - [](TRuntimeNode state) { - return state; - }); - - TUnboxedValueVector streamItems; - for (auto item : items) { - streamItems.push_back(NUdf::TUnboxedValuePod(item)); - } + auto graph = setup.BuildGraph(pgmReturn, {streamNode}); + auto streamValue = NUdf::TUnboxedValuePod(new TStreamWithYield(streamItems, yieldPos, startIndex)); + graph->GetEntryPoint(0, true)->SetValue(graph->GetContext(), std::move(streamValue)); + return graph; + }; - auto graph = setup.BuildGraph(pgmReturn, {streamNode}); - auto streamValue = NUdf::TUnboxedValuePod(new TStreamWithYield(streamItems, yieldPos, startIndex)); - graph->GetEntryPoint(0, true)->SetValue(graph->GetContext(), std::move(streamValue)); - return graph; - }; + for (ui32 yieldPos = 0; yieldPos < items.size(); ++yieldPos) { + TSetup setup1(alloc); + auto graph1 = buildGraph(setup1, yieldPos, 0); - for (ui32 yieldPos = 0; yieldPos < items.size(); ++yieldPos) { - TSetup setup1(alloc); - auto graph1 = buildGraph(setup1, yieldPos, 0); + auto root1 = graph1->GetValue(); + NUdf::TUnboxedValue res; + auto status = root1.Fetch(res); + UNIT_ASSERT_EQUAL(status, NUdf::EFetchStatus::Yield); - auto root1 = graph1->GetValue(); - NUdf::TUnboxedValue res; - auto status = root1.Fetch(res); - UNIT_ASSERT_EQUAL(status, NUdf::EFetchStatus::Yield); + TString graphState; + SaveGraphState(&root1, 1, 0ULL, graphState); - TString graphState; - SaveGraphState(&root1, 1, 0ULL, graphState); + TSetup setup2(alloc); + auto graph2 = buildGraph(setup2, -1, yieldPos); - TSetup setup2(alloc); - auto graph2 = buildGraph(setup2, -1, yieldPos); + auto root2 = graph2->GetValue(); + LoadGraphState(&root2, 1, 0ULL, graphState); - auto root2 = graph2->GetValue(); - LoadGraphState(&root2, 1, 0ULL, graphState); + status = root2.Fetch(res); + UNIT_ASSERT_EQUAL(status, NUdf::EFetchStatus::Ok); + UNIT_ASSERT_EQUAL(res.Get<ui32>(), 36); - status = root2.Fetch(res); - UNIT_ASSERT_EQUAL(status, NUdf::EFetchStatus::Ok); - UNIT_ASSERT_EQUAL(res.Get<ui32>(), 36); + status = root2.Fetch(res); + UNIT_ASSERT_EQUAL(status, NUdf::EFetchStatus::Finish); + } +} - status = root2.Fetch(res); - UNIT_ASSERT_EQUAL(status, NUdf::EFetchStatus::Finish); +Y_UNIT_TEST(TestSqueeze1SaveLoad) { + TScopedAlloc alloc(__LOCATION__); + + const std::vector<ui32> items = {1, 2, 3, 4, 5, 6, 7, 8}; + + auto buildGraph = [&items](TSetup& setup, ui32 yieldPos, ui32 startIndex) -> THolder<IComputationGraph> { + TProgramBuilder& pgmBuilder = *setup.PgmBuilder; + + auto dataType = pgmBuilder.NewDataType(NUdf::TDataType<ui32>::Id); + auto streamType = pgmBuilder.NewStreamType(dataType); + + TCallableBuilder inStream(pgmBuilder.GetTypeEnvironment(), "OneYieldStream", streamType); + auto streamNode = inStream.Build(); + + auto pgmReturn = pgmBuilder.Squeeze1( + TRuntimeNode(streamNode, false), + [](TRuntimeNode item) { + return item; + }, + [&](TRuntimeNode item, TRuntimeNode state) { + return pgmBuilder.Add(item, state); + }, + [](TRuntimeNode state) { + return state; + }, + [](TRuntimeNode state) { + return state; + }); + + TUnboxedValueVector streamItems; + for (auto item : items) { + streamItems.push_back(NUdf::TUnboxedValuePod(item)); } - } - Y_UNIT_TEST(TestSqueeze1SaveLoad) { - TScopedAlloc alloc(__LOCATION__); - - const std::vector<ui32> items = {1, 2, 3, 4, 5, 6, 7, 8}; - - auto buildGraph = [&items] (TSetup& setup, ui32 yieldPos, ui32 startIndex) -> THolder<IComputationGraph> { - TProgramBuilder& pgmBuilder = *setup.PgmBuilder; - - auto dataType = pgmBuilder.NewDataType(NUdf::TDataType<ui32>::Id); - auto streamType = pgmBuilder.NewStreamType(dataType); - - TCallableBuilder inStream(pgmBuilder.GetTypeEnvironment(), "OneYieldStream", streamType); - auto streamNode = inStream.Build(); - - auto pgmReturn = pgmBuilder.Squeeze1( - TRuntimeNode(streamNode, false), - [](TRuntimeNode item) { - return item; - }, - [&](TRuntimeNode item, TRuntimeNode state) { - return pgmBuilder.Add(item, state); - }, - [](TRuntimeNode state) { - return state; - }, - [](TRuntimeNode state) { - return state; - }); - - TUnboxedValueVector streamItems; - for (auto item : items) { - streamItems.push_back(NUdf::TUnboxedValuePod(item)); - } + auto graph = setup.BuildGraph(pgmReturn, {streamNode}); + auto streamValue = NUdf::TUnboxedValuePod(new TStreamWithYield(streamItems, yieldPos, startIndex)); + graph->GetEntryPoint(0, true)->SetValue(graph->GetContext(), std::move(streamValue)); + return graph; + }; - auto graph = setup.BuildGraph(pgmReturn, {streamNode}); - auto streamValue = NUdf::TUnboxedValuePod(new TStreamWithYield(streamItems, yieldPos, startIndex)); - graph->GetEntryPoint(0, true)->SetValue(graph->GetContext(), std::move(streamValue)); - return graph; - }; + for (ui32 yieldPos = 0; yieldPos < items.size(); ++yieldPos) { + TSetup setup1(alloc); + auto graph1 = buildGraph(setup1, yieldPos, 0); - for (ui32 yieldPos = 0; yieldPos < items.size(); ++yieldPos) { - TSetup setup1(alloc); - auto graph1 = buildGraph(setup1, yieldPos, 0); + auto root1 = graph1->GetValue(); - auto root1 = graph1->GetValue(); + NUdf::TUnboxedValue res; + auto status = root1.Fetch(res); + UNIT_ASSERT_EQUAL(status, NUdf::EFetchStatus::Yield); - NUdf::TUnboxedValue res; - auto status = root1.Fetch(res); - UNIT_ASSERT_EQUAL(status, NUdf::EFetchStatus::Yield); + TString graphState; + SaveGraphState(&root1, 1, 0ULL, graphState); - TString graphState; - SaveGraphState(&root1, 1, 0ULL, graphState); + TSetup setup2(alloc); + auto graph2 = buildGraph(setup2, -1, yieldPos); - TSetup setup2(alloc); - auto graph2 = buildGraph(setup2, -1, yieldPos); + auto root2 = graph2->GetValue(); + LoadGraphState(&root2, 1, 0ULL, graphState); - auto root2 = graph2->GetValue(); - LoadGraphState(&root2, 1, 0ULL, graphState); + status = root2.Fetch(res); + UNIT_ASSERT_EQUAL(status, NUdf::EFetchStatus::Ok); + UNIT_ASSERT_EQUAL(res.Get<ui32>(), 36); - status = root2.Fetch(res); - UNIT_ASSERT_EQUAL(status, NUdf::EFetchStatus::Ok); - UNIT_ASSERT_EQUAL(res.Get<ui32>(), 36); + status = root2.Fetch(res); + UNIT_ASSERT_EQUAL(status, NUdf::EFetchStatus::Finish); + } +} - status = root2.Fetch(res); - UNIT_ASSERT_EQUAL(status, NUdf::EFetchStatus::Finish); +Y_UNIT_TEST(TestHoppingSaveLoad) { + TScopedAlloc alloc(__LOCATION__); + + const std::vector<std::pair<i64, ui32>> items = { + {1, 2}, + {2, 3}, + {15, 4}, + {23, 6}, + {24, 5}, + {25, 7}, + {40, 2}, + {47, 1}, + {51, 6}, + {59, 2}, + {85, 8}, + {55, 1000}, + {200, 0}}; + + auto buildGraph = [&items](TSetup& setup, ui32 yieldPos, ui32 startIndex) -> THolder<IComputationGraph> { + TProgramBuilder& pgmBuilder = *setup.PgmBuilder; + + auto structType = pgmBuilder.NewEmptyStructType(); + structType = pgmBuilder.NewStructType(structType, "time", + pgmBuilder.NewDataType(NUdf::TDataType<NUdf::TTimestamp>::Id)); + structType = pgmBuilder.NewStructType(structType, "sum", + pgmBuilder.NewDataType(NUdf::TDataType<ui32>::Id)); + auto timeIndex = AS_TYPE(TStructType, structType)->GetMemberIndex("time"); + auto sumIndex = AS_TYPE(TStructType, structType)->GetMemberIndex("sum"); + + auto inStreamType = pgmBuilder.NewStreamType(structType); + + TCallableBuilder inStream(pgmBuilder.GetTypeEnvironment(), "OneYieldStream", inStreamType); + auto streamNode = inStream.Build(); + + ui64 hop = 10, interval = 30, delay = 20; + + auto pgmReturn = pgmBuilder.HoppingCore( + TRuntimeNode(streamNode, false), + [&](TRuntimeNode item) { // timeExtractor + return pgmBuilder.Member(item, "time"); + }, + [&](TRuntimeNode item) { // init + std::vector<std::pair<std::string_view, TRuntimeNode>> members; + members.emplace_back("sum", pgmBuilder.Member(item, "sum")); + return pgmBuilder.NewStruct(members); + }, + [&](TRuntimeNode item, TRuntimeNode state) { // update + auto add = pgmBuilder.AggrAdd( + pgmBuilder.Member(item, "sum"), + pgmBuilder.Member(state, "sum")); + std::vector<std::pair<std::string_view, TRuntimeNode>> members; + members.emplace_back("sum", add); + return pgmBuilder.NewStruct(members); + }, + [&](TRuntimeNode state) { // save + return pgmBuilder.Member(state, "sum"); + }, + [&](TRuntimeNode savedState) { // load + std::vector<std::pair<std::string_view, TRuntimeNode>> members; + members.emplace_back("sum", savedState); + return pgmBuilder.NewStruct(members); + }, + [&](TRuntimeNode state1, TRuntimeNode state2) { // merge + auto add = pgmBuilder.AggrAdd( + pgmBuilder.Member(state1, "sum"), + pgmBuilder.Member(state2, "sum")); + std::vector<std::pair<std::string_view, TRuntimeNode>> members; + members.emplace_back("sum", add); + return pgmBuilder.NewStruct(members); + }, + [&](TRuntimeNode state, TRuntimeNode time) { // finish + Y_UNUSED(time); + std::vector<std::pair<std::string_view, TRuntimeNode>> members; + members.emplace_back("sum", pgmBuilder.Member(state, "sum")); + return pgmBuilder.NewStruct(members); + }, + pgmBuilder.NewDataLiteral<NUdf::EDataSlot::Interval>(NUdf::TStringRef((const char*)&hop, sizeof(hop))), // hop + pgmBuilder.NewDataLiteral<NUdf::EDataSlot::Interval>(NUdf::TStringRef((const char*)&interval, sizeof(interval))), // interval + pgmBuilder.NewDataLiteral<NUdf::EDataSlot::Interval>(NUdf::TStringRef((const char*)&delay, sizeof(delay))) // delay + ); + + auto graph = setup.BuildGraph(pgmReturn, {streamNode}); + + TUnboxedValueVector streamItems; + for (size_t i = 0; i < items.size(); ++i) { + NUdf::TUnboxedValue* itemsPtr; + auto structValues = graph->GetHolderFactory().CreateDirectArrayHolder(2, itemsPtr); + itemsPtr[timeIndex] = NUdf::TUnboxedValuePod(items[i].first); + itemsPtr[sumIndex] = NUdf::TUnboxedValuePod(items[i].second); + streamItems.push_back(std::move(structValues)); } - } - Y_UNIT_TEST(TestHoppingSaveLoad) { - TScopedAlloc alloc(__LOCATION__); - - const std::vector<std::pair<i64, ui32>> items = { - {1, 2}, - {2, 3}, - {15, 4}, - {23, 6}, - {24, 5}, - {25, 7}, - {40, 2}, - {47, 1}, - {51, 6}, - {59, 2}, - {85, 8}, - {55, 1000}, - {200, 0} - }; - - auto buildGraph = [&items] (TSetup& setup, ui32 yieldPos, ui32 startIndex) -> THolder<IComputationGraph> { - TProgramBuilder& pgmBuilder = *setup.PgmBuilder; - - auto structType = pgmBuilder.NewEmptyStructType(); - structType = pgmBuilder.NewStructType(structType, "time", - pgmBuilder.NewDataType(NUdf::TDataType<NUdf::TTimestamp>::Id)); - structType = pgmBuilder.NewStructType(structType, "sum", - pgmBuilder.NewDataType(NUdf::TDataType<ui32>::Id)); - auto timeIndex = AS_TYPE(TStructType, structType)->GetMemberIndex("time"); - auto sumIndex = AS_TYPE(TStructType, structType)->GetMemberIndex("sum"); - - auto inStreamType = pgmBuilder.NewStreamType(structType); - - TCallableBuilder inStream(pgmBuilder.GetTypeEnvironment(), "OneYieldStream", inStreamType); - auto streamNode = inStream.Build(); - - ui64 hop = 10, interval = 30, delay = 20; - - auto pgmReturn = pgmBuilder.HoppingCore( - TRuntimeNode(streamNode, false), - [&](TRuntimeNode item) { // timeExtractor - return pgmBuilder.Member(item, "time"); - }, - [&](TRuntimeNode item) { // init - std::vector<std::pair<std::string_view, TRuntimeNode>> members; - members.emplace_back("sum", pgmBuilder.Member(item, "sum")); - return pgmBuilder.NewStruct(members); - }, - [&](TRuntimeNode item, TRuntimeNode state) { // update - auto add = pgmBuilder.AggrAdd( - pgmBuilder.Member(item, "sum"), - pgmBuilder.Member(state, "sum")); - std::vector<std::pair<std::string_view, TRuntimeNode>> members; - members.emplace_back("sum", add); - return pgmBuilder.NewStruct(members); - }, - [&](TRuntimeNode state) { // save - return pgmBuilder.Member(state, "sum"); - }, - [&](TRuntimeNode savedState) { // load - std::vector<std::pair<std::string_view, TRuntimeNode>> members; - members.emplace_back("sum", savedState); - return pgmBuilder.NewStruct(members); - }, - [&](TRuntimeNode state1, TRuntimeNode state2) { // merge - auto add = pgmBuilder.AggrAdd( - pgmBuilder.Member(state1, "sum"), - pgmBuilder.Member(state2, "sum")); - std::vector<std::pair<std::string_view, TRuntimeNode>> members; - members.emplace_back("sum", add); - return pgmBuilder.NewStruct(members); - }, - [&](TRuntimeNode state, TRuntimeNode time) { // finish - Y_UNUSED(time); - std::vector<std::pair<std::string_view, TRuntimeNode>> members; - members.emplace_back("sum", pgmBuilder.Member(state, "sum")); - return pgmBuilder.NewStruct(members); - }, - pgmBuilder.NewDataLiteral<NUdf::EDataSlot::Interval>(NUdf::TStringRef((const char*)&hop, sizeof(hop))), // hop - pgmBuilder.NewDataLiteral<NUdf::EDataSlot::Interval>(NUdf::TStringRef((const char*)&interval, sizeof(interval))), // interval - pgmBuilder.NewDataLiteral<NUdf::EDataSlot::Interval>(NUdf::TStringRef((const char*)&delay, sizeof(delay))) // delay - ); - - auto graph = setup.BuildGraph(pgmReturn, {streamNode}); - - TUnboxedValueVector streamItems; - for (size_t i = 0; i < items.size(); ++i) { - NUdf::TUnboxedValue* itemsPtr; - auto structValues = graph->GetHolderFactory().CreateDirectArrayHolder(2, itemsPtr); - itemsPtr[timeIndex] = NUdf::TUnboxedValuePod(items[i].first); - itemsPtr[sumIndex] = NUdf::TUnboxedValuePod(items[i].second); - streamItems.push_back(std::move(structValues)); - } + auto streamValue = NUdf::TUnboxedValuePod(new TStreamWithYield(streamItems, yieldPos, startIndex)); + graph->GetEntryPoint(0, true)->SetValue(graph->GetContext(), std::move(streamValue)); + return graph; + }; - auto streamValue = NUdf::TUnboxedValuePod(new TStreamWithYield(streamItems, yieldPos, startIndex)); - graph->GetEntryPoint(0, true)->SetValue(graph->GetContext(), std::move(streamValue)); - return graph; - }; - - for (ui32 yieldPos = 0; yieldPos < items.size(); ++yieldPos) { - std::vector<ui32> result; - - TSetup setup1(alloc); - auto graph1 = buildGraph(setup1, yieldPos, 0); - auto root1 = graph1->GetValue(); - - NUdf::EFetchStatus status = NUdf::EFetchStatus::Ok; - while (status == NUdf::EFetchStatus::Ok) { - NUdf::TUnboxedValue val; - status = root1.Fetch(val); - if (status == NUdf::EFetchStatus::Ok) { - result.push_back(val.GetElement(0).Get<ui32>()); - } + for (ui32 yieldPos = 0; yieldPos < items.size(); ++yieldPos) { + std::vector<ui32> result; + + TSetup setup1(alloc); + auto graph1 = buildGraph(setup1, yieldPos, 0); + auto root1 = graph1->GetValue(); + + NUdf::EFetchStatus status = NUdf::EFetchStatus::Ok; + while (status == NUdf::EFetchStatus::Ok) { + NUdf::TUnboxedValue val; + status = root1.Fetch(val); + if (status == NUdf::EFetchStatus::Ok) { + result.push_back(val.GetElement(0).Get<ui32>()); } - UNIT_ASSERT_EQUAL(status, NUdf::EFetchStatus::Yield); - - TString graphState; - SaveGraphState(&root1, 1, 0ULL, graphState); - - TSetup setup2(alloc); - auto graph2 = buildGraph(setup2, -1, yieldPos); - auto root2 = graph2->GetValue(); - LoadGraphState(&root2, 1, 0ULL, graphState); - - status = NUdf::EFetchStatus::Ok; - while (status == NUdf::EFetchStatus::Ok) { - NUdf::TUnboxedValue val; - status = root2.Fetch(val); - if (status == NUdf::EFetchStatus::Ok) { - result.push_back(val.GetElement(0).Get<ui32>()); - } + } + UNIT_ASSERT_EQUAL(status, NUdf::EFetchStatus::Yield); + + TString graphState; + SaveGraphState(&root1, 1, 0ULL, graphState); + + TSetup setup2(alloc); + auto graph2 = buildGraph(setup2, -1, yieldPos); + auto root2 = graph2->GetValue(); + LoadGraphState(&root2, 1, 0ULL, graphState); + + status = NUdf::EFetchStatus::Ok; + while (status == NUdf::EFetchStatus::Ok) { + NUdf::TUnboxedValue val; + status = root2.Fetch(val); + if (status == NUdf::EFetchStatus::Ok) { + result.push_back(val.GetElement(0).Get<ui32>()); } - UNIT_ASSERT_EQUAL(status, NUdf::EFetchStatus::Finish); - - const std::vector<ui32> resultCompare = {5, 9, 27, 22, 21, 11, 11, 8, 8, 8, 8}; - UNIT_ASSERT_EQUAL(result, resultCompare); } + UNIT_ASSERT_EQUAL(status, NUdf::EFetchStatus::Finish); + + const std::vector<ui32> resultCompare = {5, 9, 27, 22, 21, 11, 11, 8, 8, 8, 8}; + UNIT_ASSERT_EQUAL(result, resultCompare); } } +} // Y_UNIT_TEST_SUITE(TMiniKQLSaveLoadTest) } // namespace NMiniKQL } // namespace NKikimr |
